47b4a7ef99093745162b4b50c2c28d9043f30c1e
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
9 from subprocess
import Popen
12 from mininet
.net
import Dockernet
13 from mininet
.node
import Controller
, DefaultController
, OVSSwitch
, OVSKernelSwitch
, Docker
, RemoteController
14 from mininet
.cli
import CLI
15 from mininet
.link
import TCLink
17 from emuvim
.dcemulator
.monitoring
import DCNetworkMonitor
18 from emuvim
.dcemulator
.node
import Datacenter
, EmulatorCompute
19 from emuvim
.dcemulator
.resourcemodel
import ResourceModelRegistrar
22 class DCNetwork(Dockernet
):
24 Wraps the original Mininet/Dockernet class and provides
25 methods to add data centers, switches, etc.
27 This class is used by topology definition scripts.
30 def __init__(self
, controller
=RemoteController
,
31 dc_emulation_max_cpu
=1.0, # fraction of overall CPU time for emulation
32 dc_emulation_max_mem
=512, # emulation max mem in MB
35 Create an extended version of a Dockernet network
36 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
37 :param kwargs: path through for Mininet parameters
42 # call original Docker.__init__ and setup default controller
44 self
, switch
=OVSKernelSwitch
, **kwargs
)
47 self
.ryu_process
= None
48 if controller
== RemoteController
:
49 # start Ryu controller
52 # add the specified controller
53 self
.addController('c0', controller
=controller
)
55 # graph of the complete DC network
56 self
.DCNetwork_graph
= nx
.MultiDiGraph()
59 self
.monitor_agent
= DCNetworkMonitor(self
)
61 # initialize resource model registrar
62 self
.rm_registrar
= ResourceModelRegistrar(
63 dc_emulation_max_cpu
, dc_emulation_max_mem
)
65 def addDatacenter(self
, label
, metadata
={}, resource_log_path
=None):
67 Create and add a logical cloud data center to the network.
70 raise Exception("Data center label already exists: %s" % label
)
71 dc
= Datacenter(label
, metadata
=metadata
, resource_log_path
=resource_log_path
)
72 dc
.net
= self
# set reference to network
74 dc
.create() # finally create the data center in our Mininet instance
75 logging
.info("added data center: %s" % label
)
78 def addLink(self
, node1
, node2
, **params
):
80 Able to handle Datacenter objects as link
83 assert node1
is not None
84 assert node2
is not None
85 logging
.debug("addLink: n1=%s n2=%s" % (str(node1
), str(node2
)))
86 # ensure type of node1
87 if isinstance( node1
, basestring
):
89 node1
= self
.dcs
[node1
].switch
90 if isinstance( node1
, Datacenter
):
92 # ensure type of node2
93 if isinstance( node2
, basestring
):
95 node2
= self
.dcs
[node2
].switch
96 if isinstance( node2
, Datacenter
):
98 # try to give containers a default IP
99 if isinstance( node1
, Docker
):
100 if "params1" not in params
:
101 params
["params1"] = {}
102 if "ip" not in params
["params1"]:
103 params
["params1"]["ip"] = self
.getNextIp()
104 if isinstance( node2
, Docker
):
105 if "params2" not in params
:
106 params
["params2"] = {}
107 if "ip" not in params
["params2"]:
108 params
["params2"]["ip"] = self
.getNextIp()
109 # ensure that we allow TCLinks between data centers
110 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
111 # see Dockernet issue: https://github.com/mpeuster/dockernet/issues/3
112 if "cls" not in params
:
113 params
["cls"] = TCLink
115 link
= Dockernet
.addLink(self
, node1
, node2
, **params
)
117 # try to give container interfaces a default id
118 node1_port_id
= node1
.ports
[link
.intf1
]
119 if isinstance(node1
, Docker
):
120 if "id" in params
["params1"]:
121 node1_port_id
= params
["params1"]["id"]
123 node2_port_id
= node2
.ports
[link
.intf2
]
124 if isinstance(node2
, Docker
):
125 if "id" in params
["params2"]:
126 node2_port_id
= params
["params2"]["id"]
128 # add edge and assigned port number to graph in both directions between node1 and node2
129 # port_id: id given in descriptor (if available, otherwise same as port)
130 # port: portnumber assigned by Dockernet
132 self
.DCNetwork_graph
.add_edge(node1
.name
, node2
.name
,
133 attr_dict
={'src_port_id': node1_port_id
, 'src_port': node1
.ports
[link
.intf1
],
134 'dst_port_id': node2_port_id
, 'dst_port': node2
.ports
[link
.intf2
]})
135 self
.DCNetwork_graph
.add_edge(node2
.name
, node1
.name
,
136 attr_dict
={'src_port_id': node2_port_id
, 'src_port': node2
.ports
[link
.intf2
],
137 'dst_port_id': node1_port_id
, 'dst_port': node1
.ports
[link
.intf1
]})
141 def addDocker( self
, label
, **params
):
143 Wrapper for addDocker method to use custom container class.
145 self
.DCNetwork_graph
.add_node(label
)
146 return Dockernet
.addDocker(self
, label
, cls
=EmulatorCompute
, **params
)
148 def removeDocker( self
, label
, **params
):
150 Wrapper for removeDocker method to update graph.
152 self
.DCNetwork_graph
.remove_node(label
)
153 return Dockernet
.removeDocker(self
, label
, **params
)
155 def addSwitch( self
, name
, add_to_graph
=True, **params
):
157 Wrapper for addSwitch method to store switch also in graph.
160 self
.DCNetwork_graph
.add_node(name
)
161 return Dockernet
.addSwitch(self
, name
, protocols
='OpenFlow10,OpenFlow12,OpenFlow13', **params
)
163 def getAllContainers(self
):
165 Returns a list with all containers within all data centers.
168 for dc
in self
.dcs
.itervalues():
169 all_containers
+= dc
.listCompute()
170 return all_containers
174 for dc
in self
.dcs
.itervalues():
176 Dockernet
.start(self
)
179 # stop Ryu controller
186 # to remove chain do setChain( src, dst, cmd='del-flows')
187 def setChain(self
, vnf_src_name
, vnf_dst_name
, cmd
='add-flow'):
189 #check if port is specified (vnf:port)
191 vnf_source_interface
= vnf_src_name
.split(':')[1]
193 # take first interface by default
194 connected_sw
= self
.DCNetwork_graph
.neighbors(vnf_src_name
)[0]
195 link_dict
= self
.DCNetwork_graph
[vnf_src_name
][connected_sw
]
196 vnf_source_interface
= link_dict
[0]['src_port_id']
197 #vnf_source_interface = 0
199 vnf_src_name
= vnf_src_name
.split(':')[0]
200 for connected_sw
in self
.DCNetwork_graph
.neighbors(vnf_src_name
):
201 link_dict
= self
.DCNetwork_graph
[vnf_src_name
][connected_sw
]
202 for link
in link_dict
:
203 #logging.info("{0},{1}".format(link_dict[link],vnf_source_interface))
204 if link_dict
[link
]['src_port_id'] == vnf_source_interface
:
205 # found the right link and connected switch
206 #logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
207 src_sw
= connected_sw
209 src_sw_inport
= link_dict
[link
]['dst_port']
213 vnf_dest_interface
= vnf_dst_name
.split(':')[1]
215 # take first interface by default
216 connected_sw
= self
.DCNetwork_graph
.neighbors(vnf_dst_name
)[0]
217 link_dict
= self
.DCNetwork_graph
[connected_sw
][vnf_dst_name
]
218 vnf_dest_interface
= link_dict
[0]['dst_port_id']
219 #vnf_dest_interface = 0
221 vnf_dst_name
= vnf_dst_name
.split(':')[0]
222 for connected_sw
in self
.DCNetwork_graph
.neighbors(vnf_dst_name
):
223 link_dict
= self
.DCNetwork_graph
[connected_sw
][vnf_dst_name
]
224 for link
in link_dict
:
225 if link_dict
[link
]['dst_port_id'] == vnf_dest_interface
:
226 # found the right link and connected switch
227 dst_sw
= connected_sw
228 dst_sw_outport
= link_dict
[link
]['src_port']
233 #path = nx.shortest_path(self.DCNetwork_graph, vnf_src_name, vnf_dst_name)
235 path
= nx
.shortest_path(self
.DCNetwork_graph
, src_sw
, dst_sw
)
237 logging
.info("No path could be found between {0} and {1}".format(vnf_src_name
, vnf_dst_name
))
238 return "No path could be found between {0} and {1}".format(vnf_src_name
, vnf_dst_name
)
240 logging
.info("Path between {0} and {1}: {2}".format(vnf_src_name
, vnf_dst_name
, path
))
242 #current_hop = vnf_src_name
244 switch_inport
= src_sw_inport
246 for i
in range(0,len(path
)):
247 current_node
= self
.getNodeByName(current_hop
)
248 if path
.index(current_hop
) < len(path
)-1:
249 next_hop
= path
[path
.index(current_hop
)+1]
252 next_hop
= vnf_dst_name
254 next_node
= self
.getNodeByName(next_hop
)
256 if next_hop
== vnf_dst_name
:
257 switch_outport
= dst_sw_outport
258 logging
.info("end node reached: {0}".format(vnf_dst_name
))
259 elif not isinstance( next_node
, OVSSwitch
):
260 logging
.info("Next node: {0} is not a switch".format(next_hop
))
261 return "Next node: {0} is not a switch".format(next_hop
)
264 switch_outport
= self
.DCNetwork_graph
[current_hop
][next_hop
][index_edge_out
]['src_port']
266 # take into account that multiple edges are possible between 2 nodes
270 #switch_inport = self.DCNetwork_graph[current_hop][next_hop][index_edge_in]['dst_port']
272 #next2_hop = path[path.index(current_hop)+2]
274 #switch_outport = self.DCNetwork_graph[next_hop][next2_hop][index_edge_out]['src_port']
275 #switch_outport = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port']
277 #logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport, switch_outport))
278 # set of entry via ovs-ofctl
279 # TODO use rest API of ryu to set flow entries to correct witch dpid
280 if isinstance( current_node
, OVSSwitch
):
281 match
= 'in_port=%s' % switch_inport
284 action
= 'action=%s' % switch_outport
286 ofcmd
= s
.join([match
,action
])
287 elif cmd
=='del-flows':
292 current_node
.dpctl(cmd
, ofcmd
)
293 logging
.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node
.name
, switch_inport
,
296 switch_inport
= self
.DCNetwork_graph
[current_hop
][next_hop
][index_edge_out
]['dst_port']
297 current_hop
= next_hop
299 return "path added between {0} and {1}".format(vnf_src_name
, vnf_dst_name
)
300 #return "destination node: {0} not reached".format(vnf_dst_name)
302 # start Ryu Openflow controller as Remote Controller for the DCNetwork
304 # start Ryu controller with rest-API
305 python_install_path
= site
.getsitepackages()[0]
306 ryu_path
= python_install_path
+ '/ryu/app/simple_switch_13.py'
307 ryu_path2
= python_install_path
+ '/ryu/app/ofctl_rest.py'
308 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
309 # Ryu still uses 6633 as default
310 ryu_option
= '--ofp-tcp-listen-port'
312 ryu_cmd
= 'ryu-manager'
313 FNULL
= open("/tmp/ryu.log", 'w')
314 self
.ryu_process
= Popen([ryu_cmd
, ryu_path
, ryu_path2
, ryu_option
, ryu_of_port
], stdout
=FNULL
, stderr
=FNULL
)
316 #self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
320 if self
.ryu_process
is not None:
321 self
.ryu_process
.terminate()
322 self
.ryu_process
.kill()