ec42b669e894d0a4bd52a976e5a03b9ea9c823f8
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
9 from subprocess
import Popen
15 from mininet
.net
import Dockernet
16 from mininet
.node
import Controller
, DefaultController
, OVSSwitch
, OVSKernelSwitch
, Docker
, RemoteController
17 from mininet
.cli
import CLI
18 from mininet
.link
import TCLink
20 from emuvim
.dcemulator
.monitoring
import DCNetworkMonitor
21 from emuvim
.dcemulator
.node
import Datacenter
, EmulatorCompute
22 from emuvim
.dcemulator
.resourcemodel
import ResourceModelRegistrar
24 class DCNetwork(Dockernet
):
26 Wraps the original Mininet/Dockernet class and provides
27 methods to add data centers, switches, etc.
29 This class is used by topology definition scripts.
32 def __init__(self
, controller
=RemoteController
, monitor
=False,
33 dc_emulation_max_cpu
=1.0, # fraction of overall CPU time for emulation
34 dc_emulation_max_mem
=512, # emulation max mem in MB
37 Create an extended version of a Dockernet network
38 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
39 :param kwargs: path through for Mininet parameters
44 # call original Docker.__init__ and setup default controller
46 self
, switch
=OVSKernelSwitch
, **kwargs
)
49 self
.ryu_process
= None
50 if controller
== RemoteController
:
51 # start Ryu controller
54 # add the specified controller
55 self
.addController('c0', controller
=controller
)
57 # graph of the complete DC network
58 self
.DCNetwork_graph
= nx
.MultiDiGraph()
62 self
.monitor_agent
= DCNetworkMonitor(self
)
64 self
.monitor_agent
= None
66 # initialize resource model registrar
67 self
.rm_registrar
= ResourceModelRegistrar(
68 dc_emulation_max_cpu
, dc_emulation_max_mem
)
70 def addDatacenter(self
, label
, metadata
={}, resource_log_path
=None):
72 Create and add a logical cloud data center to the network.
75 raise Exception("Data center label already exists: %s" % label
)
76 dc
= Datacenter(label
, metadata
=metadata
, resource_log_path
=resource_log_path
)
77 dc
.net
= self
# set reference to network
79 dc
.create() # finally create the data center in our Mininet instance
80 logging
.info("added data center: %s" % label
)
83 def addLink(self
, node1
, node2
, **params
):
85 Able to handle Datacenter objects as link
88 assert node1
is not None
89 assert node2
is not None
90 logging
.debug("addLink: n1=%s n2=%s" % (str(node1
), str(node2
)))
91 # ensure type of node1
92 if isinstance( node1
, basestring
):
94 node1
= self
.dcs
[node1
].switch
95 if isinstance( node1
, Datacenter
):
97 # ensure type of node2
98 if isinstance( node2
, basestring
):
100 node2
= self
.dcs
[node2
].switch
101 if isinstance( node2
, Datacenter
):
103 # try to give containers a default IP
104 if isinstance( node1
, Docker
):
105 if "params1" not in params
:
106 params
["params1"] = {}
107 if "ip" not in params
["params1"]:
108 params
["params1"]["ip"] = self
.getNextIp()
109 if isinstance( node2
, Docker
):
110 if "params2" not in params
:
111 params
["params2"] = {}
112 if "ip" not in params
["params2"]:
113 params
["params2"]["ip"] = self
.getNextIp()
114 # ensure that we allow TCLinks between data centers
115 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
116 # see Dockernet issue: https://github.com/mpeuster/dockernet/issues/3
117 if "cls" not in params
:
118 params
["cls"] = TCLink
120 link
= Dockernet
.addLink(self
, node1
, node2
, **params
)
122 # try to give container interfaces a default id
123 node1_port_id
= node1
.ports
[link
.intf1
]
124 if isinstance(node1
, Docker
):
125 if "id" in params
["params1"]:
126 node1_port_id
= params
["params1"]["id"]
127 node1_port_name
= link
.intf1
.name
129 node2_port_id
= node2
.ports
[link
.intf2
]
130 if isinstance(node2
, Docker
):
131 if "id" in params
["params2"]:
132 node2_port_id
= params
["params2"]["id"]
133 node2_port_name
= link
.intf2
.name
136 # add edge and assigned port number to graph in both directions between node1 and node2
137 # port_id: id given in descriptor (if available, otherwise same as port)
138 # port: portnumber assigned by Dockernet
141 # possible weight metrics allowed by TClink class:
142 weight_metrics
= ['bw', 'delay', 'jitter', 'loss']
143 edge_attributes
= [p
for p
in params
if p
in weight_metrics
]
144 for attr
in edge_attributes
:
145 # if delay: strip ms (need number as weight in graph)
146 match
= re
.search('([0-9]*\.?[0-9]+)', params
[attr
])
148 attr_number
= match
.group(1)
151 attr_dict
[attr
] = attr_number
154 attr_dict2
= {'src_port_id': node1_port_id
, 'src_port_nr': node1
.ports
[link
.intf1
],
155 'src_port_name': node1_port_name
,
156 'dst_port_id': node2_port_id
, 'dst_port_nr': node2
.ports
[link
.intf2
],
157 'dst_port_name': node2_port_name
}
158 attr_dict2
.update(attr_dict
)
159 self
.DCNetwork_graph
.add_edge(node1
.name
, node2
.name
, attr_dict
=attr_dict2
)
161 attr_dict2
= {'src_port_id': node2_port_id
, 'src_port_nr': node2
.ports
[link
.intf2
],
162 'src_port_name': node2_port_name
,
163 'dst_port_id': node1_port_id
, 'dst_port_nr': node1
.ports
[link
.intf1
],
164 'dst_port_name': node1_port_name
}
165 attr_dict2
.update(attr_dict
)
166 self
.DCNetwork_graph
.add_edge(node2
.name
, node1
.name
, attr_dict
=attr_dict2
)
170 def addDocker( self
, label
, **params
):
172 Wrapper for addDocker method to use custom container class.
174 self
.DCNetwork_graph
.add_node(label
)
175 return Dockernet
.addDocker(self
, label
, cls
=EmulatorCompute
, **params
)
177 def removeDocker( self
, label
, **params
):
179 Wrapper for removeDocker method to update graph.
181 self
.DCNetwork_graph
.remove_node(label
)
182 return Dockernet
.removeDocker(self
, label
, **params
)
184 def addSwitch( self
, name
, add_to_graph
=True, **params
):
186 Wrapper for addSwitch method to store switch also in graph.
189 self
.DCNetwork_graph
.add_node(name
)
190 return Dockernet
.addSwitch(self
, name
, protocols
='OpenFlow10,OpenFlow12,OpenFlow13', **params
)
192 def getAllContainers(self
):
194 Returns a list with all containers within all data centers.
197 for dc
in self
.dcs
.itervalues():
198 all_containers
+= dc
.listCompute()
199 return all_containers
203 for dc
in self
.dcs
.itervalues():
205 Dockernet
.start(self
)
209 # stop the monitor agent
210 if self
.monitor_agent
is not None:
211 self
.monitor_agent
.stop()
216 # stop Ryu controller
223 # to remove chain do setChain( src, dst, cmd='del-flows')
224 def setChain(self
, vnf_src_name
, vnf_dst_name
, vnf_src_interface
=None, vnf_dst_interface
=None, cmd
='add-flow',
225 weight
=None, **kwargs
):
227 logging
.info('vnf_src_if: {0}'.format(vnf_src_interface
))
228 #check if port is specified (vnf:port)
229 if vnf_src_interface
is None:
230 # take first interface by default
231 connected_sw
= self
.DCNetwork_graph
.neighbors(vnf_src_name
)[0]
232 link_dict
= self
.DCNetwork_graph
[vnf_src_name
][connected_sw
]
233 vnf_src_interface
= link_dict
[0]['src_port_id']
234 #logging.info('vnf_src_if: {0}'.format(vnf_src_interface))
236 for connected_sw
in self
.DCNetwork_graph
.neighbors(vnf_src_name
):
237 link_dict
= self
.DCNetwork_graph
[vnf_src_name
][connected_sw
]
238 for link
in link_dict
:
239 #logging.info("here1: {0},{1}".format(link_dict[link],vnf_src_interface))
240 if link_dict
[link
]['src_port_id'] == vnf_src_interface
:
241 # found the right link and connected switch
242 #logging.info("conn_sw: {2},{0},{1}".format(link_dict[link]['src_port_id'], vnf_src_interface, connected_sw))
243 src_sw
= connected_sw
245 src_sw_inport_nr
= link_dict
[link
]['dst_port_nr']
248 if vnf_dst_interface
is None:
249 # take first interface by default
250 connected_sw
= self
.DCNetwork_graph
.neighbors(vnf_dst_name
)[0]
251 link_dict
= self
.DCNetwork_graph
[connected_sw
][vnf_dst_name
]
252 vnf_dst_interface
= link_dict
[0]['dst_port_id']
254 vnf_dst_name
= vnf_dst_name
.split(':')[0]
255 for connected_sw
in self
.DCNetwork_graph
.neighbors(vnf_dst_name
):
256 link_dict
= self
.DCNetwork_graph
[connected_sw
][vnf_dst_name
]
257 for link
in link_dict
:
258 if link_dict
[link
]['dst_port_id'] == vnf_dst_interface
:
259 # found the right link and connected switch
260 dst_sw
= connected_sw
261 dst_sw_outport_nr
= link_dict
[link
]['src_port_nr']
266 #path = nx.shortest_path(self.DCNetwork_graph, vnf_src_name, vnf_dst_name)
268 # returns the first found shortest path
269 # if all shortest paths are wanted, use: all_shortest_paths
270 path
= nx
.shortest_path(self
.DCNetwork_graph
, src_sw
, dst_sw
, weight
=weight
)
272 logging
.info("No path could be found between {0} and {1}".format(vnf_src_name
, vnf_dst_name
))
273 return "No path could be found between {0} and {1}".format(vnf_src_name
, vnf_dst_name
)
275 logging
.info("Path between {0} and {1}: {2}".format(vnf_src_name
, vnf_dst_name
, path
))
277 #current_hop = vnf_src_name
279 switch_inport_nr
= src_sw_inport_nr
281 for i
in range(0,len(path
)):
282 current_node
= self
.getNodeByName(current_hop
)
283 if path
.index(current_hop
) < len(path
)-1:
284 next_hop
= path
[path
.index(current_hop
)+1]
287 next_hop
= vnf_dst_name
289 next_node
= self
.getNodeByName(next_hop
)
291 if next_hop
== vnf_dst_name
:
292 switch_outport_nr
= dst_sw_outport_nr
293 logging
.info("end node reached: {0}".format(vnf_dst_name
))
294 elif not isinstance( next_node
, OVSSwitch
):
295 logging
.info("Next node: {0} is not a switch".format(next_hop
))
296 return "Next node: {0} is not a switch".format(next_hop
)
298 # take first link between switches by default
300 switch_outport_nr
= self
.DCNetwork_graph
[current_hop
][next_hop
][index_edge_out
]['src_port_nr']
303 #logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport_nr, switch_outport_nr))
304 # set of entry via ovs-ofctl
305 # TODO use rest API of ryu to set flow entries to correct dpid
306 # TODO this only sets port in to out, no match, so this will give trouble when multiple services are deployed...
307 # TODO need multiple matches to do this (VLAN tags)
308 if isinstance( current_node
, OVSSwitch
):
309 match
= 'in_port=%s' % switch_inport_nr
310 #add additional match entries from the argument
311 match_input
= kwargs
.get('match')
312 logging
.info('match input:{0}'.format(match_input
))
315 match
= s
.join([match
,match_input
])
318 action
= 'action=%s' % switch_outport_nr
320 ofcmd
= s
.join([match
,action
])
321 elif cmd
=='del-flows':
326 current_node
.dpctl(cmd
, ofcmd
)
327 logging
.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node
.name
, switch_inport_nr
,
329 # take first link between switches by default
330 if isinstance( next_node
, OVSSwitch
):
331 switch_inport_nr
= self
.DCNetwork_graph
[current_hop
][next_hop
][0]['dst_port_nr']
332 current_hop
= next_hop
334 return "path added between {0} and {1}".format(vnf_src_name
, vnf_dst_name
)
335 #return "destination node: {0} not reached".format(vnf_dst_name)
337 # start Ryu Openflow controller as Remote Controller for the DCNetwork
339 # start Ryu controller with rest-API
340 python_install_path
= site
.getsitepackages()[0]
341 ryu_path
= python_install_path
+ '/ryu/app/simple_switch_13.py'
342 ryu_path2
= python_install_path
+ '/ryu/app/ofctl_rest.py'
343 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
344 # Ryu still uses 6633 as default
345 ryu_option
= '--ofp-tcp-listen-port'
347 ryu_cmd
= 'ryu-manager'
348 FNULL
= open("/tmp/ryu.log", 'w')
349 self
.ryu_process
= Popen([ryu_cmd
, ryu_path
, ryu_path2
, ryu_option
, ryu_of_port
], stdout
=FNULL
, stderr
=FNULL
)
351 #self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
355 if self
.ryu_process
is not None:
356 self
.ryu_process
.terminate()
357 self
.ryu_process
.kill()