2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
9 from subprocess
import Popen
13 from functools
import partial
15 from mininet
.net
import Containernet
16 from mininet
.node
import Controller
, DefaultController
, OVSSwitch
, OVSKernelSwitch
, Docker
, RemoteController
17 from mininet
.cli
import CLI
18 from mininet
.link
import TCLink
20 from emuvim
.dcemulator
.monitoring
import DCNetworkMonitor
21 from emuvim
.dcemulator
.node
import Datacenter
, EmulatorCompute
22 from emuvim
.dcemulator
.resourcemodel
import ResourceModelRegistrar
24 class DCNetwork(Containernet
):
26 Wraps the original Mininet/Containernet class and provides
27 methods to add data centers, switches, etc.
29 This class is used by topology definition scripts.
32 def __init__(self
, controller
=RemoteController
, monitor
=False,
33 enable_learning
= True, # in case of RemoteController (Ryu), learning switch behavior can be turned off/on
34 dc_emulation_max_cpu
=1.0, # fraction of overall CPU time for emulation
35 dc_emulation_max_mem
=512, # emulation max mem in MB
38 Create an extended version of a Containernet network
39 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
40 :param kwargs: path through for Mininet parameters
45 # call original Docker.__init__ and setup default controller
46 Containernet
.__init
__(
47 self
, switch
=OVSKernelSwitch
, controller
=controller
, **kwargs
)
51 self
.ryu_process
= None
52 if controller
== RemoteController
:
53 # start Ryu controller
54 self
.startRyu(learning_switch
=enable_learning
)
56 # add the specified controller
57 self
.addController('c0', controller
=controller
)
59 # graph of the complete DC network
60 self
.DCNetwork_graph
= nx
.MultiDiGraph()
62 # initialize pool of vlan tags to setup the SDN paths
63 self
.vlans
= range(4096)[::-1]
65 # link to Ryu REST_API
68 self
.ryu_REST_api
= 'http://{0}:{1}'.format(ryu_ip
, ryu_port
)
72 self
.monitor_agent
= DCNetworkMonitor(self
)
74 self
.monitor_agent
= None
76 # initialize resource model registrar
77 self
.rm_registrar
= ResourceModelRegistrar(
78 dc_emulation_max_cpu
, dc_emulation_max_mem
)
80 def addDatacenter(self
, label
, metadata
={}, resource_log_path
=None):
82 Create and add a logical cloud data center to the network.
85 raise Exception("Data center label already exists: %s" % label
)
86 dc
= Datacenter(label
, metadata
=metadata
, resource_log_path
=resource_log_path
)
87 dc
.net
= self
# set reference to network
89 dc
.create() # finally create the data center in our Mininet instance
90 logging
.info("added data center: %s" % label
)
93 def addLink(self
, node1
, node2
, **params
):
95 Able to handle Datacenter objects as link
98 assert node1
is not None
99 assert node2
is not None
100 logging
.debug("addLink: n1=%s n2=%s" % (str(node1
), str(node2
)))
101 # ensure type of node1
102 if isinstance( node1
, basestring
):
103 if node1
in self
.dcs
:
104 node1
= self
.dcs
[node1
].switch
105 if isinstance( node1
, Datacenter
):
107 # ensure type of node2
108 if isinstance( node2
, basestring
):
109 if node2
in self
.dcs
:
110 node2
= self
.dcs
[node2
].switch
111 if isinstance( node2
, Datacenter
):
113 # try to give containers a default IP
114 if isinstance( node1
, Docker
):
115 if "params1" not in params
:
116 params
["params1"] = {}
117 if "ip" not in params
["params1"]:
118 params
["params1"]["ip"] = self
.getNextIp()
119 if isinstance( node2
, Docker
):
120 if "params2" not in params
:
121 params
["params2"] = {}
122 if "ip" not in params
["params2"]:
123 params
["params2"]["ip"] = self
.getNextIp()
124 # ensure that we allow TCLinks between data centers
125 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
126 # see Containernet issue: https://github.com/mpeuster/containernet/issues/3
127 if "cls" not in params
:
128 params
["cls"] = TCLink
130 link
= Containernet
.addLink(self
, node1
, node2
, **params
)
132 # try to give container interfaces a default id
133 node1_port_id
= node1
.ports
[link
.intf1
]
134 if isinstance(node1
, Docker
):
135 if "id" in params
["params1"]:
136 node1_port_id
= params
["params1"]["id"]
137 node1_port_name
= link
.intf1
.name
139 node2_port_id
= node2
.ports
[link
.intf2
]
140 if isinstance(node2
, Docker
):
141 if "id" in params
["params2"]:
142 node2_port_id
= params
["params2"]["id"]
143 node2_port_name
= link
.intf2
.name
146 # add edge and assigned port number to graph in both directions between node1 and node2
147 # port_id: id given in descriptor (if available, otherwise same as port)
148 # port: portnumber assigned by Containernet
151 # possible weight metrics allowed by TClink class:
152 weight_metrics
= ['bw', 'delay', 'jitter', 'loss']
153 edge_attributes
= [p
for p
in params
if p
in weight_metrics
]
154 for attr
in edge_attributes
:
155 # if delay: strip ms (need number as weight in graph)
156 match
= re
.search('([0-9]*\.?[0-9]+)', params
[attr
])
158 attr_number
= match
.group(1)
161 attr_dict
[attr
] = attr_number
164 attr_dict2
= {'src_port_id': node1_port_id
, 'src_port_nr': node1
.ports
[link
.intf1
],
165 'src_port_name': node1_port_name
,
166 'dst_port_id': node2_port_id
, 'dst_port_nr': node2
.ports
[link
.intf2
],
167 'dst_port_name': node2_port_name
}
168 attr_dict2
.update(attr_dict
)
169 self
.DCNetwork_graph
.add_edge(node1
.name
, node2
.name
, attr_dict
=attr_dict2
)
171 attr_dict2
= {'src_port_id': node2_port_id
, 'src_port_nr': node2
.ports
[link
.intf2
],
172 'src_port_name': node2_port_name
,
173 'dst_port_id': node1_port_id
, 'dst_port_nr': node1
.ports
[link
.intf1
],
174 'dst_port_name': node1_port_name
}
175 attr_dict2
.update(attr_dict
)
176 self
.DCNetwork_graph
.add_edge(node2
.name
, node1
.name
, attr_dict
=attr_dict2
)
180 def addDocker( self
, label
, **params
):
182 Wrapper for addDocker method to use custom container class.
184 self
.DCNetwork_graph
.add_node(label
)
185 return Containernet
.addDocker(self
, label
, cls
=EmulatorCompute
, **params
)
187 def removeDocker( self
, label
, **params
):
189 Wrapper for removeDocker method to update graph.
191 self
.DCNetwork_graph
.remove_node(label
)
192 return Containernet
.removeDocker(self
, label
, **params
)
194 def addSwitch( self
, name
, add_to_graph
=True, **params
):
196 Wrapper for addSwitch method to store switch also in graph.
199 self
.DCNetwork_graph
.add_node(name
)
200 return Containernet
.addSwitch(self
, name
, protocols
='OpenFlow10,OpenFlow12,OpenFlow13', **params
)
202 def getAllContainers(self
):
204 Returns a list with all containers within all data centers.
207 for dc
in self
.dcs
.itervalues():
208 all_containers
+= dc
.listCompute()
209 return all_containers
213 for dc
in self
.dcs
.itervalues():
215 Containernet
.start(self
)
219 # stop the monitor agent
220 if self
.monitor_agent
is not None:
221 self
.monitor_agent
.stop()
224 Containernet
.stop(self
)
226 # stop Ryu controller
233 # to remove chain do setChain( src, dst, cmd='del-flows')
234 def setChain(self
, vnf_src_name
, vnf_dst_name
, vnf_src_interface
=None, vnf_dst_interface
=None, **kwargs
):
235 cmd
= kwargs
.get('cmd')
236 if cmd
== 'add-flow':
237 ret
= self
._chainAddFlow
(vnf_src_name
, vnf_dst_name
, vnf_src_interface
, vnf_dst_interface
, **kwargs
)
238 if kwargs
.get('bidirectional'):
239 return ret
+'\n' + self
._chainAddFlow
(vnf_dst_name
, vnf_src_name
, vnf_dst_interface
, vnf_src_interface
, **kwargs
)
241 elif cmd
== 'del-flows': # TODO: del-flow to be implemented
242 ret
= self
._chainAddFlow
(vnf_src_name
, vnf_dst_name
, vnf_src_interface
, vnf_dst_interface
, **kwargs
)
243 if kwargs
.get('bidirectional'):
244 return ret
+ '\n' + self
._chainAddFlow
(vnf_dst_name
, vnf_src_name
, vnf_dst_interface
, vnf_src_interface
, **kwargs
)
247 return "Command unknown"
250 def _chainAddFlow(self
, vnf_src_name
, vnf_dst_name
, vnf_src_interface
=None, vnf_dst_interface
=None, **kwargs
):
252 # TODO: this needs to be cleaned up
253 #check if port is specified (vnf:port)
254 if vnf_src_interface
is None:
255 # take first interface by default
256 connected_sw
= self
.DCNetwork_graph
.neighbors(vnf_src_name
)[0]
257 link_dict
= self
.DCNetwork_graph
[vnf_src_name
][connected_sw
]
258 vnf_src_interface
= link_dict
[0]['src_port_id']
260 for connected_sw
in self
.DCNetwork_graph
.neighbors(vnf_src_name
):
261 link_dict
= self
.DCNetwork_graph
[vnf_src_name
][connected_sw
]
262 for link
in link_dict
:
263 if link_dict
[link
]['src_port_id'] == vnf_src_interface
:
264 # found the right link and connected switch
265 src_sw
= connected_sw
267 src_sw_inport_nr
= link_dict
[link
]['dst_port_nr']
270 if vnf_dst_interface
is None:
271 # take first interface by default
272 connected_sw
= self
.DCNetwork_graph
.neighbors(vnf_dst_name
)[0]
273 link_dict
= self
.DCNetwork_graph
[connected_sw
][vnf_dst_name
]
274 vnf_dst_interface
= link_dict
[0]['dst_port_id']
276 vnf_dst_name
= vnf_dst_name
.split(':')[0]
277 for connected_sw
in self
.DCNetwork_graph
.neighbors(vnf_dst_name
):
278 link_dict
= self
.DCNetwork_graph
[connected_sw
][vnf_dst_name
]
279 for link
in link_dict
:
280 if link_dict
[link
]['dst_port_id'] == vnf_dst_interface
:
281 # found the right link and connected switch
282 dst_sw
= connected_sw
283 dst_sw_outport_nr
= link_dict
[link
]['src_port_nr']
289 # returns the first found shortest path
290 # if all shortest paths are wanted, use: all_shortest_paths
291 path
= nx
.shortest_path(self
.DCNetwork_graph
, src_sw
, dst_sw
, weight
=kwargs
.get('weight'))
293 logging
.info("No path could be found between {0} and {1}".format(vnf_src_name
, vnf_dst_name
))
294 return "No path could be found between {0} and {1}".format(vnf_src_name
, vnf_dst_name
)
296 logging
.info("Path between {0} and {1}: {2}".format(vnf_src_name
, vnf_dst_name
, path
))
299 switch_inport_nr
= src_sw_inport_nr
301 # choose free vlan if path contains more than 1 switch
302 cmd
= kwargs
.get('cmd')
304 if cmd
== 'add-flow':
306 vlan
= self
.vlans
.pop()
308 for i
in range(0,len(path
)):
309 current_node
= self
.getNodeByName(current_hop
)
311 if path
.index(current_hop
) < len(path
)-1:
312 next_hop
= path
[path
.index(current_hop
)+1]
315 next_hop
= vnf_dst_name
317 next_node
= self
.getNodeByName(next_hop
)
319 if next_hop
== vnf_dst_name
:
320 switch_outport_nr
= dst_sw_outport_nr
321 logging
.info("end node reached: {0}".format(vnf_dst_name
))
322 elif not isinstance( next_node
, OVSSwitch
):
323 logging
.info("Next node: {0} is not a switch".format(next_hop
))
324 return "Next node: {0} is not a switch".format(next_hop
)
326 # take first link between switches by default
328 switch_outport_nr
= self
.DCNetwork_graph
[current_hop
][next_hop
][index_edge_out
]['src_port_nr']
331 # set of entry via ovs-ofctl
332 if isinstance( current_node
, OVSSwitch
):
333 kwargs
['vlan'] = vlan
334 kwargs
['path'] = path
335 kwargs
['current_hop'] = current_hop
337 if self
.controller
== RemoteController
:
338 ## set flow entry via ryu rest api
339 self
._set
_flow
_entry
_ryu
_rest
(current_node
, switch_inport_nr
, switch_outport_nr
, **kwargs
)
341 ## set flow entry via ovs-ofctl
342 self
._set
_flow
_entry
_dpctl
(current_node
, switch_inport_nr
, switch_outport_nr
, **kwargs
)
346 # take first link between switches by default
347 if isinstance( next_node
, OVSSwitch
):
348 switch_inport_nr
= self
.DCNetwork_graph
[current_hop
][next_hop
][0]['dst_port_nr']
349 current_hop
= next_hop
351 return "path {2} between {0} and {1}".format(vnf_src_name
, vnf_dst_name
, cmd
)
353 def _set_flow_entry_ryu_rest(self
, node
, switch_inport_nr
, switch_outport_nr
, **kwargs
):
354 match
= 'in_port=%s' % switch_inport_nr
356 cookie
= kwargs
.get('cookie')
357 match_input
= kwargs
.get('match')
358 cmd
= kwargs
.get('cmd')
359 path
= kwargs
.get('path')
360 current_hop
= kwargs
.get('current_hop')
361 vlan
= kwargs
.get('vlan')
365 match
= s
.join([match
, match_input
])
368 flow
['dpid'] = int(node
.dpid
, 16)
371 flow
['cookie'] = int(cookie
)
376 # possible Ryu actions, match fields:
377 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
378 if cmd
== 'add-flow':
379 prefix
= 'stats/flowentry/add'
381 if path
.index(current_hop
) == 0: # first node
383 action
['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
384 action
['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
385 flow
['actions'].append(action
)
387 action
['type'] = 'SET_FIELD'
388 action
['field'] = 'vlan_vid'
389 action
['value'] = vlan
390 flow
['actions'].append(action
)
391 elif path
.index(current_hop
) == len(path
) - 1: # last node
392 match
+= ',dl_vlan=%s' % vlan
394 action
['type'] = 'POP_VLAN'
395 flow
['actions'].append(action
)
397 match
+= ',dl_vlan=%s' % vlan
398 # output action must come last
400 action
['type'] = 'OUTPUT'
401 action
['port'] = switch_outport_nr
402 flow
['actions'].append(action
)
404 elif cmd
== 'del-flows':
405 prefix
= 'stats/flowentry/delete'
407 # if cookie is given, only delete flows by cookie
408 # do not specify other match -> also other cookies can be matched
410 flow
['cookie_mask'] = int('0xffffffffffffffff', 16) # need full mask to match complete cookie
413 action
['type'] = 'OUTPUT'
414 action
['port'] = switch_outport_nr
415 flow
['actions'].append(action
)
417 flow
['match'] = self
._parse
_match
(match
)
418 self
.ryu_REST(prefix
, data
=flow
)
420 def _set_flow_entry_dpctl(self
, node
, switch_inport_nr
, switch_outport_nr
, **kwargs
):
421 match
= 'in_port=%s' % switch_inport_nr
423 cookie
= kwargs
.get('cookie')
424 match_input
= kwargs
.get('match')
425 cmd
= kwargs
.get('cmd')
426 path
= kwargs
.get('path')
427 current_hop
= kwargs
.get('current_hop')
428 vlan
= kwargs
.get('vlan')
432 cookie
= 'cookie=%s' % cookie
433 match
= s
.join([cookie
, match
])
435 match
= s
.join([match
, match_input
])
436 if cmd
== 'add-flow':
437 action
= 'action=%s' % switch_outport_nr
439 if path
.index(current_hop
) == 0: # first node
440 action
= ('action=mod_vlan_vid:%s' % vlan
) + (',output=%s' % switch_outport_nr
)
441 match
= '-O OpenFlow13 ' + match
442 elif path
.index(current_hop
) == len(path
) - 1: # last node
443 match
+= ',dl_vlan=%s' % vlan
444 action
= 'action=strip_vlan,output=%s' % switch_outport_nr
446 match
+= ',dl_vlan=%s' % vlan
447 ofcmd
= s
.join([match
, action
])
448 elif cmd
== 'del-flows':
453 node
.dpctl(cmd
, ofcmd
)
454 logging
.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node
.name
, switch_inport_nr
,
455 switch_outport_nr
, cmd
))
457 # start Ryu Openflow controller as Remote Controller for the DCNetwork
458 def startRyu(self
, learning_switch
=True):
459 # start Ryu controller with rest-API
460 python_install_path
= site
.getsitepackages()[0]
461 ryu_path
= python_install_path
+ '/ryu/app/simple_switch_13.py'
462 ryu_path2
= python_install_path
+ '/ryu/app/ofctl_rest.py'
463 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
464 # Ryu still uses 6633 as default
465 ryu_option
= '--ofp-tcp-listen-port'
467 ryu_cmd
= 'ryu-manager'
468 FNULL
= open("/tmp/ryu.log", 'w')
470 self
.ryu_process
= Popen([ryu_cmd
, ryu_path
, ryu_path2
, ryu_option
, ryu_of_port
], stdout
=FNULL
, stderr
=FNULL
)
473 self
.ryu_process
= Popen([ryu_cmd
, ryu_path2
, ryu_option
, ryu_of_port
], stdout
=FNULL
, stderr
=FNULL
)
477 if self
.ryu_process
is not None:
478 self
.ryu_process
.terminate()
479 self
.ryu_process
.kill()
481 def ryu_REST(self
, prefix
, dpid
=None, data
=None):
482 if data
: logging
.info('log POST: {0}'.format(str(data
)))
485 url
= self
.ryu_REST_api
+ '/' + str(prefix
) + '/' + str(dpid
)
487 url
= self
.ryu_REST_api
+ '/' + str(prefix
)
489 #logging.info('POST: {0}'.format(str(data)))
490 req
= urllib2
.Request(url
, str(data
))
492 req
= urllib2
.Request(url
)
494 ret
= urllib2
.urlopen(req
).read()
497 logging
.info('error url: {0}'.format(str(url
)))
498 if data
: logging
.info('error POST: {0}'.format(str(data
)))
500 # need to respect that some match fields must be integers
501 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
502 def _parse_match(self
, match
):
503 matches
= match
.split(',')
509 m2
= int(match
[1], 0)
513 dict.update({match
[0]:m2
})