Merge pull request #92 from stevenvanrossem/master
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
4 """
5 import logging
6
7 import site
8 import time
9 from subprocess import Popen
10 import os
11 import re
12
13
14
15 from mininet.net import Dockernet
16 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
17 from mininet.cli import CLI
18 from mininet.link import TCLink
19 import networkx as nx
20 from emuvim.dcemulator.monitoring import DCNetworkMonitor
21 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
22 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
23
24 class DCNetwork(Dockernet):
25 """
26 Wraps the original Mininet/Dockernet class and provides
27 methods to add data centers, switches, etc.
28
29 This class is used by topology definition scripts.
30 """
31
32 def __init__(self, controller=RemoteController, monitor=False,
33 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
34 dc_emulation_max_mem=512, # emulation max mem in MB
35 **kwargs):
36 """
37 Create an extended version of a Dockernet network
38 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
39 :param kwargs: path through for Mininet parameters
40 :return:
41 """
42 self.dcs = {}
43
44 # call original Docker.__init__ and setup default controller
45 Dockernet.__init__(
46 self, switch=OVSKernelSwitch, **kwargs)
47
48 # Ryu management
49 self.ryu_process = None
50 if controller == RemoteController:
51 # start Ryu controller
52 self.startRyu()
53
54 # add the specified controller
55 self.addController('c0', controller=controller)
56
57 # graph of the complete DC network
58 self.DCNetwork_graph = nx.MultiDiGraph()
59
60 # monitoring agent
61 if monitor:
62 self.monitor_agent = DCNetworkMonitor(self)
63 else:
64 self.monitor_agent = None
65
66 # initialize resource model registrar
67 self.rm_registrar = ResourceModelRegistrar(
68 dc_emulation_max_cpu, dc_emulation_max_mem)
69
70 def addDatacenter(self, label, metadata={}, resource_log_path=None):
71 """
72 Create and add a logical cloud data center to the network.
73 """
74 if label in self.dcs:
75 raise Exception("Data center label already exists: %s" % label)
76 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
77 dc.net = self # set reference to network
78 self.dcs[label] = dc
79 dc.create() # finally create the data center in our Mininet instance
80 logging.info("added data center: %s" % label)
81 return dc
82
83 def addLink(self, node1, node2, **params):
84 """
85 Able to handle Datacenter objects as link
86 end points.
87 """
88 assert node1 is not None
89 assert node2 is not None
90 logging.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
91 # ensure type of node1
92 if isinstance( node1, basestring ):
93 if node1 in self.dcs:
94 node1 = self.dcs[node1].switch
95 if isinstance( node1, Datacenter ):
96 node1 = node1.switch
97 # ensure type of node2
98 if isinstance( node2, basestring ):
99 if node2 in self.dcs:
100 node2 = self.dcs[node2].switch
101 if isinstance( node2, Datacenter ):
102 node2 = node2.switch
103 # try to give containers a default IP
104 if isinstance( node1, Docker ):
105 if "params1" not in params:
106 params["params1"] = {}
107 if "ip" not in params["params1"]:
108 params["params1"]["ip"] = self.getNextIp()
109 if isinstance( node2, Docker ):
110 if "params2" not in params:
111 params["params2"] = {}
112 if "ip" not in params["params2"]:
113 params["params2"]["ip"] = self.getNextIp()
114 # ensure that we allow TCLinks between data centers
115 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
116 # see Dockernet issue: https://github.com/mpeuster/dockernet/issues/3
117 if "cls" not in params:
118 params["cls"] = TCLink
119
120 link = Dockernet.addLink(self, node1, node2, **params)
121
122 # try to give container interfaces a default id
123 node1_port_id = node1.ports[link.intf1]
124 if isinstance(node1, Docker):
125 if "id" in params["params1"]:
126 node1_port_id = params["params1"]["id"]
127
128 node2_port_id = node2.ports[link.intf2]
129 if isinstance(node2, Docker):
130 if "id" in params["params2"]:
131 node2_port_id = params["params2"]["id"]
132
133
134
135 # add edge and assigned port number to graph in both directions between node1 and node2
136 # port_id: id given in descriptor (if available, otherwise same as port)
137 # port: portnumber assigned by Dockernet
138
139 attr_dict = {}
140 # possible weight metrics allowed by TClink class:
141 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
142 edge_attributes = [p for p in params if p in weight_metrics]
143 for attr in edge_attributes:
144 # if delay: strip ms (need number as weight in graph)
145 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
146 if match:
147 attr_number = match.group(1)
148 else:
149 attr_number = None
150 attr_dict[attr] = attr_number
151
152
153 attr_dict2 = {'src_port_id': node1_port_id, 'src_port': node1.ports[link.intf1],
154 'dst_port_id': node2_port_id, 'dst_port': node2.ports[link.intf2]}
155 attr_dict2.update(attr_dict)
156 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
157
158 attr_dict2 = {'src_port_id': node2_port_id, 'src_port': node2.ports[link.intf2],
159 'dst_port_id': node1_port_id, 'dst_port': node1.ports[link.intf1]}
160 attr_dict2.update(attr_dict)
161 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
162
163 return link
164
165 def addDocker( self, label, **params ):
166 """
167 Wrapper for addDocker method to use custom container class.
168 """
169 self.DCNetwork_graph.add_node(label)
170 return Dockernet.addDocker(self, label, cls=EmulatorCompute, **params)
171
172 def removeDocker( self, label, **params ):
173 """
174 Wrapper for removeDocker method to update graph.
175 """
176 self.DCNetwork_graph.remove_node(label)
177 return Dockernet.removeDocker(self, label, **params)
178
179 def addSwitch( self, name, add_to_graph=True, **params ):
180 """
181 Wrapper for addSwitch method to store switch also in graph.
182 """
183 if add_to_graph:
184 self.DCNetwork_graph.add_node(name)
185 return Dockernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
186
187 def getAllContainers(self):
188 """
189 Returns a list with all containers within all data centers.
190 """
191 all_containers = []
192 for dc in self.dcs.itervalues():
193 all_containers += dc.listCompute()
194 return all_containers
195
196 def start(self):
197 # start
198 for dc in self.dcs.itervalues():
199 dc.start()
200 Dockernet.start(self)
201
202 def stop(self):
203
204 # stop the monitor agent
205 if self.monitor_agent is not None:
206 self.monitor_agent.stop()
207
208 # stop emulator net
209 Dockernet.stop(self)
210
211 # stop Ryu controller
212 self.stopRyu()
213
214
215 def CLI(self):
216 CLI(self)
217
218 # to remove chain do setChain( src, dst, cmd='del-flows')
219 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, cmd='add-flow', weight=None):
220
221 #check if port is specified (vnf:port)
222 if vnf_src_interface is None:
223 # take first interface by default
224 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
225 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
226 vnf_src_interface = link_dict[0]['src_port_id']
227 #logging.info('vnf_src_if: {0}'.format(vnf_src_interface))
228
229 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
230 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
231 for link in link_dict:
232 #logging.info("here1: {0},{1}".format(link_dict[link],vnf_src_interface))
233 if link_dict[link]['src_port_id'] == vnf_src_interface:
234 # found the right link and connected switch
235 #logging.info("conn_sw: {2},{0},{1}".format(link_dict[link]['src_port_id'], vnf_src_interface, connected_sw))
236 src_sw = connected_sw
237
238 src_sw_inport = link_dict[link]['dst_port']
239 break
240
241 if vnf_dst_interface is None:
242 # take first interface by default
243 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
244 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
245 vnf_dst_interface = link_dict[0]['dst_port_id']
246
247 vnf_dst_name = vnf_dst_name.split(':')[0]
248 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
249 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
250 for link in link_dict:
251 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
252 # found the right link and connected switch
253 dst_sw = connected_sw
254 dst_sw_outport = link_dict[link]['src_port']
255 break
256
257
258 # get shortest path
259 #path = nx.shortest_path(self.DCNetwork_graph, vnf_src_name, vnf_dst_name)
260 try:
261 # returns the first found shortest path
262 # if all shortest paths are wanted, use: all_shortest_paths
263 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=weight)
264 except:
265 logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
266 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
267
268 logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
269
270 #current_hop = vnf_src_name
271 current_hop = src_sw
272 switch_inport = src_sw_inport
273
274 for i in range(0,len(path)):
275 current_node = self.getNodeByName(current_hop)
276 if path.index(current_hop) < len(path)-1:
277 next_hop = path[path.index(current_hop)+1]
278 else:
279 #last switch reached
280 next_hop = vnf_dst_name
281
282 next_node = self.getNodeByName(next_hop)
283
284 if next_hop == vnf_dst_name:
285 switch_outport = dst_sw_outport
286 logging.info("end node reached: {0}".format(vnf_dst_name))
287 elif not isinstance( next_node, OVSSwitch ):
288 logging.info("Next node: {0} is not a switch".format(next_hop))
289 return "Next node: {0} is not a switch".format(next_hop)
290 else:
291 # take first link between switches by default
292 index_edge_out = 0
293 switch_outport = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port']
294
295
296 #logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport, switch_outport))
297 # set of entry via ovs-ofctl
298 # TODO use rest API of ryu to set flow entries to correct dpid
299 # TODO this only sets port in to out, no match, so this will give trouble when multiple services are deployed...
300 # TODO need multiple matches to do this (VLAN tags)
301 if isinstance( current_node, OVSSwitch ):
302 match = 'in_port=%s' % switch_inport
303
304 if cmd=='add-flow':
305 action = 'action=%s' % switch_outport
306 s = ','
307 ofcmd = s.join([match,action])
308 elif cmd=='del-flows':
309 ofcmd = match
310 else:
311 ofcmd=''
312
313 current_node.dpctl(cmd, ofcmd)
314 logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport,
315 switch_outport))
316 # take first link between switches by default
317 if isinstance( next_node, OVSSwitch ):
318 switch_inport = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port']
319 current_hop = next_hop
320
321 return "path added between {0} and {1}".format(vnf_src_name, vnf_dst_name)
322 #return "destination node: {0} not reached".format(vnf_dst_name)
323
324 # start Ryu Openflow controller as Remote Controller for the DCNetwork
325 def startRyu(self):
326 # start Ryu controller with rest-API
327 python_install_path = site.getsitepackages()[0]
328 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
329 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
330 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
331 # Ryu still uses 6633 as default
332 ryu_option = '--ofp-tcp-listen-port'
333 ryu_of_port = '6653'
334 ryu_cmd = 'ryu-manager'
335 FNULL = open("/tmp/ryu.log", 'w')
336 #self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
337 # no learning switch
338 self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
339 time.sleep(1)
340
341 def stopRyu(self):
342 if self.ryu_process is not None:
343 self.ryu_process.terminate()
344 self.ryu_process.kill()
345