9ca75f764399f276e4270045cb6a0651dfe8b896
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
4 """
5 import logging
6
7 import site
8 import time
9 from subprocess import Popen
10 import os
11 import re
12 import urllib2
13 from functools import partial
14
15 from mininet.net import Dockernet
16 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
17 from mininet.cli import CLI
18 from mininet.link import TCLink
19 import networkx as nx
20 from emuvim.dcemulator.monitoring import DCNetworkMonitor
21 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
22 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
23
24 class DCNetwork(Dockernet):
25 """
26 Wraps the original Mininet/Dockernet class and provides
27 methods to add data centers, switches, etc.
28
29 This class is used by topology definition scripts.
30 """
31
32 def __init__(self, controller=RemoteController, monitor=False,
33 enable_learning = True, # in case of RemoteController (Ryu), learning switch behavior can be turned off/on
34 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
35 dc_emulation_max_mem=512, # emulation max mem in MB
36 **kwargs):
37 """
38 Create an extended version of a Dockernet network
39 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
40 :param kwargs: path through for Mininet parameters
41 :return:
42 """
43 self.dcs = {}
44
45 # call original Docker.__init__ and setup default controller
46 Dockernet.__init__(
47 self, switch=OVSKernelSwitch, controller=controller, **kwargs)
48
49 # Ryu management
50 self.ryu_process = None
51 if controller == RemoteController:
52 # start Ryu controller
53 self.startRyu(learning_switch=enable_learning)
54
55 # add the specified controller
56 self.addController('c0', controller=controller)
57
58 # graph of the complete DC network
59 self.DCNetwork_graph = nx.MultiDiGraph()
60
61 # initialize pool of vlan tags to setup the SDN paths
62 self.vlans = range(4096)[::-1]
63
64 # link to Ryu REST_API
65 ryu_ip = '0.0.0.0'
66 ryu_port = '8080'
67 self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)
68
69 # monitoring agent
70 if monitor:
71 self.monitor_agent = DCNetworkMonitor(self)
72 else:
73 self.monitor_agent = None
74
75 # initialize resource model registrar
76 self.rm_registrar = ResourceModelRegistrar(
77 dc_emulation_max_cpu, dc_emulation_max_mem)
78
79 def addDatacenter(self, label, metadata={}, resource_log_path=None):
80 """
81 Create and add a logical cloud data center to the network.
82 """
83 if label in self.dcs:
84 raise Exception("Data center label already exists: %s" % label)
85 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
86 dc.net = self # set reference to network
87 self.dcs[label] = dc
88 dc.create() # finally create the data center in our Mininet instance
89 logging.info("added data center: %s" % label)
90 return dc
91
92 def addLink(self, node1, node2, **params):
93 """
94 Able to handle Datacenter objects as link
95 end points.
96 """
97 assert node1 is not None
98 assert node2 is not None
99 logging.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
100 # ensure type of node1
101 if isinstance( node1, basestring ):
102 if node1 in self.dcs:
103 node1 = self.dcs[node1].switch
104 if isinstance( node1, Datacenter ):
105 node1 = node1.switch
106 # ensure type of node2
107 if isinstance( node2, basestring ):
108 if node2 in self.dcs:
109 node2 = self.dcs[node2].switch
110 if isinstance( node2, Datacenter ):
111 node2 = node2.switch
112 # try to give containers a default IP
113 if isinstance( node1, Docker ):
114 if "params1" not in params:
115 params["params1"] = {}
116 if "ip" not in params["params1"]:
117 params["params1"]["ip"] = self.getNextIp()
118 if isinstance( node2, Docker ):
119 if "params2" not in params:
120 params["params2"] = {}
121 if "ip" not in params["params2"]:
122 params["params2"]["ip"] = self.getNextIp()
123 # ensure that we allow TCLinks between data centers
124 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
125 # see Dockernet issue: https://github.com/mpeuster/dockernet/issues/3
126 if "cls" not in params:
127 params["cls"] = TCLink
128
129 link = Dockernet.addLink(self, node1, node2, **params)
130
131 # try to give container interfaces a default id
132 node1_port_id = node1.ports[link.intf1]
133 if isinstance(node1, Docker):
134 if "id" in params["params1"]:
135 node1_port_id = params["params1"]["id"]
136 node1_port_name = link.intf1.name
137
138 node2_port_id = node2.ports[link.intf2]
139 if isinstance(node2, Docker):
140 if "id" in params["params2"]:
141 node2_port_id = params["params2"]["id"]
142 node2_port_name = link.intf2.name
143
144
145 # add edge and assigned port number to graph in both directions between node1 and node2
146 # port_id: id given in descriptor (if available, otherwise same as port)
147 # port: portnumber assigned by Dockernet
148
149 attr_dict = {}
150 # possible weight metrics allowed by TClink class:
151 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
152 edge_attributes = [p for p in params if p in weight_metrics]
153 for attr in edge_attributes:
154 # if delay: strip ms (need number as weight in graph)
155 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
156 if match:
157 attr_number = match.group(1)
158 else:
159 attr_number = None
160 attr_dict[attr] = attr_number
161
162
163 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
164 'src_port_name': node1_port_name,
165 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
166 'dst_port_name': node2_port_name}
167 attr_dict2.update(attr_dict)
168 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
169
170 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
171 'src_port_name': node2_port_name,
172 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
173 'dst_port_name': node1_port_name}
174 attr_dict2.update(attr_dict)
175 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
176
177 return link
178
179 def addDocker( self, label, **params ):
180 """
181 Wrapper for addDocker method to use custom container class.
182 """
183 self.DCNetwork_graph.add_node(label)
184 return Dockernet.addDocker(self, label, cls=EmulatorCompute, **params)
185
186 def removeDocker( self, label, **params ):
187 """
188 Wrapper for removeDocker method to update graph.
189 """
190 self.DCNetwork_graph.remove_node(label)
191 return Dockernet.removeDocker(self, label, **params)
192
193 def addSwitch( self, name, add_to_graph=True, **params ):
194 """
195 Wrapper for addSwitch method to store switch also in graph.
196 """
197 if add_to_graph:
198 self.DCNetwork_graph.add_node(name)
199 return Dockernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
200
201 def getAllContainers(self):
202 """
203 Returns a list with all containers within all data centers.
204 """
205 all_containers = []
206 for dc in self.dcs.itervalues():
207 all_containers += dc.listCompute()
208 return all_containers
209
210 def start(self):
211 # start
212 for dc in self.dcs.itervalues():
213 dc.start()
214 Dockernet.start(self)
215
216 def stop(self):
217
218 # stop the monitor agent
219 if self.monitor_agent is not None:
220 self.monitor_agent.stop()
221
222 # stop emulator net
223 Dockernet.stop(self)
224
225 # stop Ryu controller
226 self.stopRyu()
227
228
229 def CLI(self):
230 CLI(self)
231
232 # to remove chain do setChain( src, dst, cmd='del-flows')
233 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
234 cmd = kwargs.get('cmd')
235 if cmd == 'add-flow':
236 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
237 if kwargs.get('bidirectional'):
238 return ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
239
240 elif cmd == 'del-flows': # TODO: del-flow to be implemented
241 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
242 if kwargs.get('bidirectional'):
243 return ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
244
245 else:
246 return "Command unknown"
247
248
249 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
250
251 # TODO: this needs to be cleaned up
252 #check if port is specified (vnf:port)
253 if vnf_src_interface is None:
254 # take first interface by default
255 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
256 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
257 vnf_src_interface = link_dict[0]['src_port_id']
258
259 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
260 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
261 for link in link_dict:
262 if link_dict[link]['src_port_id'] == vnf_src_interface:
263 # found the right link and connected switch
264 src_sw = connected_sw
265
266 src_sw_inport_nr = link_dict[link]['dst_port_nr']
267 break
268
269 if vnf_dst_interface is None:
270 # take first interface by default
271 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
272 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
273 vnf_dst_interface = link_dict[0]['dst_port_id']
274
275 vnf_dst_name = vnf_dst_name.split(':')[0]
276 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
277 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
278 for link in link_dict:
279 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
280 # found the right link and connected switch
281 dst_sw = connected_sw
282 dst_sw_outport_nr = link_dict[link]['src_port_nr']
283 break
284
285
286 # get shortest path
287 try:
288 # returns the first found shortest path
289 # if all shortest paths are wanted, use: all_shortest_paths
290 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
291 except:
292 logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
293 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
294
295 logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
296
297 current_hop = src_sw
298 switch_inport_nr = src_sw_inport_nr
299
300 # choose free vlan if path contains more than 1 switch
301 cmd = kwargs.get('cmd')
302 vlan = None
303 if cmd == 'add-flow':
304 if len(path) > 1:
305 vlan = self.vlans.pop()
306
307 for i in range(0,len(path)):
308 current_node = self.getNodeByName(current_hop)
309
310 if path.index(current_hop) < len(path)-1:
311 next_hop = path[path.index(current_hop)+1]
312 else:
313 #last switch reached
314 next_hop = vnf_dst_name
315
316 next_node = self.getNodeByName(next_hop)
317
318 if next_hop == vnf_dst_name:
319 switch_outport_nr = dst_sw_outport_nr
320 logging.info("end node reached: {0}".format(vnf_dst_name))
321 elif not isinstance( next_node, OVSSwitch ):
322 logging.info("Next node: {0} is not a switch".format(next_hop))
323 return "Next node: {0} is not a switch".format(next_hop)
324 else:
325 # take first link between switches by default
326 index_edge_out = 0
327 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
328
329
330 # set of entry via ovs-ofctl
331 if isinstance( current_node, OVSSwitch ):
332 kwargs['vlan'] = vlan
333 kwargs['path'] = path
334 kwargs['current_hop'] = current_hop
335
336 if self.controller == RemoteController:
337 ## set flow entry via ryu rest api
338 self._set_flow_entry_ryu_rest(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
339 else:
340 ## set flow entry via ovs-ofctl
341 self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
342
343
344
345 # take first link between switches by default
346 if isinstance( next_node, OVSSwitch ):
347 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
348 current_hop = next_hop
349
350 return "path {2} between {0} and {1}".format(vnf_src_name, vnf_dst_name, cmd)
351
352 def _set_flow_entry_ryu_rest(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
353 match = 'in_port=%s' % switch_inport_nr
354
355 cookie = kwargs.get('cookie')
356 match_input = kwargs.get('match')
357 cmd = kwargs.get('cmd')
358 path = kwargs.get('path')
359 current_hop = kwargs.get('current_hop')
360 vlan = kwargs.get('vlan')
361
362 s = ','
363 if match_input:
364 match = s.join([match, match_input])
365
366 flow = {}
367 flow['dpid'] = int(node.dpid, 16)
368 logging.info('node name:{0}'.format(node.name))
369
370 if cookie:
371 flow['cookie'] = int(cookie)
372
373
374 flow['actions'] = []
375
376 # possible Ryu actions, match fields:
377 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
378 if cmd == 'add-flow':
379 prefix = 'stats/flowentry/add'
380 if vlan != None:
381 if path.index(current_hop) == 0: # first node
382 action = {}
383 action['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
384 action['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
385 flow['actions'].append(action)
386 action = {}
387 action['type'] = 'SET_FIELD'
388 action['field'] = 'vlan_vid'
389 action['value'] = vlan
390 flow['actions'].append(action)
391 elif path.index(current_hop) == len(path) - 1: # last node
392 match += ',dl_vlan=%s' % vlan
393 action = {}
394 action['type'] = 'POP_VLAN'
395 flow['actions'].append(action)
396 else: # middle nodes
397 match += ',dl_vlan=%s' % vlan
398 # output action must come last
399 action = {}
400 action['type'] = 'OUTPUT'
401 action['port'] = switch_outport_nr
402 flow['actions'].append(action)
403 #flow['match'] = self._parse_match(match)
404 elif cmd == 'del-flows':
405 #del(flow['actions'])
406 prefix = 'stats/flowentry/delete'
407 if cookie:
408 flow['cookie_mask'] = cookie
409 #if cookie is None:
410 # flow['match'] = self._parse_match(match)
411
412 action = {}
413 action['type'] = 'OUTPUT'
414 action['port'] = switch_outport_nr
415 flow['actions'].append(action)
416
417 flow['match'] = self._parse_match(match)
418 self.ryu_REST(prefix, data=flow)
419
420 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
421 match = 'in_port=%s' % switch_inport_nr
422
423 cookie = kwargs.get('cookie')
424 match_input = kwargs.get('match')
425 cmd = kwargs.get('cmd')
426 path = kwargs.get('path')
427 current_hop = kwargs.get('current_hop')
428 vlan = kwargs.get('vlan')
429
430 s = ','
431 if cookie:
432 cookie = 'cookie=%s' % cookie
433 match = s.join([cookie, match])
434 if match_input:
435 match = s.join([match, match_input])
436 if cmd == 'add-flow':
437 action = 'action=%s' % switch_outport_nr
438 if vlan != None:
439 if path.index(current_hop) == 0: # first node
440 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
441 match = '-O OpenFlow13 ' + match
442 elif path.index(current_hop) == len(path) - 1: # last node
443 match += ',dl_vlan=%s' % vlan
444 action = 'action=strip_vlan,output=%s' % switch_outport_nr
445 else: # middle nodes
446 match += ',dl_vlan=%s' % vlan
447 ofcmd = s.join([match, action])
448 elif cmd == 'del-flows':
449 ofcmd = match
450 else:
451 ofcmd = ''
452
453 node.dpctl(cmd, ofcmd)
454 logging.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
455 switch_outport_nr, cmd))
456
457 # start Ryu Openflow controller as Remote Controller for the DCNetwork
458 def startRyu(self, learning_switch=True):
459 # start Ryu controller with rest-API
460 python_install_path = site.getsitepackages()[0]
461 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
462 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
463 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
464 # Ryu still uses 6633 as default
465 ryu_option = '--ofp-tcp-listen-port'
466 ryu_of_port = '6653'
467 ryu_cmd = 'ryu-manager'
468 FNULL = open("/tmp/ryu.log", 'w')
469 if learning_switch:
470 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
471 else:
472 # no learning switch
473 self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
474 time.sleep(1)
475
476 def stopRyu(self):
477 if self.ryu_process is not None:
478 self.ryu_process.terminate()
479 self.ryu_process.kill()
480
481 def ryu_REST(self, prefix, dpid=None, data=None):
482 try:
483 if dpid:
484 url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)
485 else:
486 url = self.ryu_REST_api + '/' + str(prefix)
487 if data:
488 #logging.info('POST: {0}'.format(str(data)))
489 req = urllib2.Request(url, str(data))
490 else:
491 req = urllib2.Request(url)
492
493 ret = urllib2.urlopen(req).read()
494 return ret
495 except:
496 logging.info('error url: {0}'.format(str(url)))
497 if data: logging.info('error POST: {0}'.format(str(data)))
498
499 # need to respect that some match fields must be integers
500 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
501 def _parse_match(self, match):
502 matches = match.split(',')
503 dict = {}
504 for m in matches:
505 match = m.split('=')
506 if len(match) == 2:
507 try:
508 m2 = int(match[1], 0)
509 except:
510 m2 = match[1]
511
512 dict.update({match[0]:m2})
513 return dict
514