Merge pull request #95 from stevenvanrossem/master
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
4 """
5 import logging
6
7 import site
8 import time
9 from subprocess import Popen
10 import os
11 import re
12
13
14
15 from mininet.net import Dockernet
16 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
17 from mininet.cli import CLI
18 from mininet.link import TCLink
19 import networkx as nx
20 from emuvim.dcemulator.monitoring import DCNetworkMonitor
21 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
22 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
23
24 class DCNetwork(Dockernet):
25 """
26 Wraps the original Mininet/Dockernet class and provides
27 methods to add data centers, switches, etc.
28
29 This class is used by topology definition scripts.
30 """
31
32 def __init__(self, controller=RemoteController, monitor=False,
33 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
34 dc_emulation_max_mem=512, # emulation max mem in MB
35 **kwargs):
36 """
37 Create an extended version of a Dockernet network
38 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
39 :param kwargs: path through for Mininet parameters
40 :return:
41 """
42 self.dcs = {}
43
44 # call original Docker.__init__ and setup default controller
45 Dockernet.__init__(
46 self, switch=OVSKernelSwitch, **kwargs)
47
48 # Ryu management
49 self.ryu_process = None
50 if controller == RemoteController:
51 # start Ryu controller
52 self.startRyu()
53
54 # add the specified controller
55 self.addController('c0', controller=controller)
56
57 # graph of the complete DC network
58 self.DCNetwork_graph = nx.MultiDiGraph()
59
60 # initialize pool of vlan tags to setup the SDN paths
61 self.vlans = range(4096)[::-1]
62
63 # monitoring agent
64 if monitor:
65 self.monitor_agent = DCNetworkMonitor(self)
66 else:
67 self.monitor_agent = None
68
69 # initialize resource model registrar
70 self.rm_registrar = ResourceModelRegistrar(
71 dc_emulation_max_cpu, dc_emulation_max_mem)
72
73 def addDatacenter(self, label, metadata={}, resource_log_path=None):
74 """
75 Create and add a logical cloud data center to the network.
76 """
77 if label in self.dcs:
78 raise Exception("Data center label already exists: %s" % label)
79 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
80 dc.net = self # set reference to network
81 self.dcs[label] = dc
82 dc.create() # finally create the data center in our Mininet instance
83 logging.info("added data center: %s" % label)
84 return dc
85
86 def addLink(self, node1, node2, **params):
87 """
88 Able to handle Datacenter objects as link
89 end points.
90 """
91 assert node1 is not None
92 assert node2 is not None
93 logging.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
94 # ensure type of node1
95 if isinstance( node1, basestring ):
96 if node1 in self.dcs:
97 node1 = self.dcs[node1].switch
98 if isinstance( node1, Datacenter ):
99 node1 = node1.switch
100 # ensure type of node2
101 if isinstance( node2, basestring ):
102 if node2 in self.dcs:
103 node2 = self.dcs[node2].switch
104 if isinstance( node2, Datacenter ):
105 node2 = node2.switch
106 # try to give containers a default IP
107 if isinstance( node1, Docker ):
108 if "params1" not in params:
109 params["params1"] = {}
110 if "ip" not in params["params1"]:
111 params["params1"]["ip"] = self.getNextIp()
112 if isinstance( node2, Docker ):
113 if "params2" not in params:
114 params["params2"] = {}
115 if "ip" not in params["params2"]:
116 params["params2"]["ip"] = self.getNextIp()
117 # ensure that we allow TCLinks between data centers
118 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
119 # see Dockernet issue: https://github.com/mpeuster/dockernet/issues/3
120 if "cls" not in params:
121 params["cls"] = TCLink
122
123 link = Dockernet.addLink(self, node1, node2, **params)
124
125 # try to give container interfaces a default id
126 node1_port_id = node1.ports[link.intf1]
127 if isinstance(node1, Docker):
128 if "id" in params["params1"]:
129 node1_port_id = params["params1"]["id"]
130 node1_port_name = link.intf1.name
131
132 node2_port_id = node2.ports[link.intf2]
133 if isinstance(node2, Docker):
134 if "id" in params["params2"]:
135 node2_port_id = params["params2"]["id"]
136 node2_port_name = link.intf2.name
137
138
139 # add edge and assigned port number to graph in both directions between node1 and node2
140 # port_id: id given in descriptor (if available, otherwise same as port)
141 # port: portnumber assigned by Dockernet
142
143 attr_dict = {}
144 # possible weight metrics allowed by TClink class:
145 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
146 edge_attributes = [p for p in params if p in weight_metrics]
147 for attr in edge_attributes:
148 # if delay: strip ms (need number as weight in graph)
149 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
150 if match:
151 attr_number = match.group(1)
152 else:
153 attr_number = None
154 attr_dict[attr] = attr_number
155
156
157 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
158 'src_port_name': node1_port_name,
159 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
160 'dst_port_name': node2_port_name}
161 attr_dict2.update(attr_dict)
162 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
163
164 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
165 'src_port_name': node2_port_name,
166 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
167 'dst_port_name': node1_port_name}
168 attr_dict2.update(attr_dict)
169 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
170
171 return link
172
173 def addDocker( self, label, **params ):
174 """
175 Wrapper for addDocker method to use custom container class.
176 """
177 self.DCNetwork_graph.add_node(label)
178 return Dockernet.addDocker(self, label, cls=EmulatorCompute, **params)
179
180 def removeDocker( self, label, **params ):
181 """
182 Wrapper for removeDocker method to update graph.
183 """
184 self.DCNetwork_graph.remove_node(label)
185 return Dockernet.removeDocker(self, label, **params)
186
187 def addSwitch( self, name, add_to_graph=True, **params ):
188 """
189 Wrapper for addSwitch method to store switch also in graph.
190 """
191 if add_to_graph:
192 self.DCNetwork_graph.add_node(name)
193 return Dockernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
194
195 def getAllContainers(self):
196 """
197 Returns a list with all containers within all data centers.
198 """
199 all_containers = []
200 for dc in self.dcs.itervalues():
201 all_containers += dc.listCompute()
202 return all_containers
203
204 def start(self):
205 # start
206 for dc in self.dcs.itervalues():
207 dc.start()
208 Dockernet.start(self)
209
210 def stop(self):
211
212 # stop the monitor agent
213 if self.monitor_agent is not None:
214 self.monitor_agent.stop()
215
216 # stop emulator net
217 Dockernet.stop(self)
218
219 # stop Ryu controller
220 self.stopRyu()
221
222
223 def CLI(self):
224 CLI(self)
225
226 # to remove chain do setChain( src, dst, cmd='del-flows')
227 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
228 cmd = kwargs.get('cmd')
229 if cmd == 'add-flow':
230 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
231 if kwargs.get('bidirectional'):
232 return ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
233
234 elif cmd == 'del-flows': # TODO: del-flow to be implemented
235 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
236 if kwargs.get('bidirectional'):
237 return ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
238
239 else:
240 return "Command unknown"
241
242
243 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
244
245 # TODO: this needs to be cleaned up
246 #check if port is specified (vnf:port)
247 if vnf_src_interface is None:
248 # take first interface by default
249 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
250 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
251 vnf_src_interface = link_dict[0]['src_port_id']
252
253 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
254 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
255 for link in link_dict:
256 if link_dict[link]['src_port_id'] == vnf_src_interface:
257 # found the right link and connected switch
258 src_sw = connected_sw
259
260 src_sw_inport_nr = link_dict[link]['dst_port_nr']
261 break
262
263 if vnf_dst_interface is None:
264 # take first interface by default
265 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
266 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
267 vnf_dst_interface = link_dict[0]['dst_port_id']
268
269 vnf_dst_name = vnf_dst_name.split(':')[0]
270 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
271 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
272 for link in link_dict:
273 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
274 # found the right link and connected switch
275 dst_sw = connected_sw
276 dst_sw_outport_nr = link_dict[link]['src_port_nr']
277 break
278
279
280 # get shortest path
281 try:
282 # returns the first found shortest path
283 # if all shortest paths are wanted, use: all_shortest_paths
284 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
285 except:
286 logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
287 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
288
289 logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
290
291 current_hop = src_sw
292 switch_inport_nr = src_sw_inport_nr
293
294 # choose free vlan if path contains more than 1 switch
295 if len(path) > 1:
296 vlan = self.vlans.pop()
297 else:
298 vlan = None
299
300 for i in range(0,len(path)):
301 current_node = self.getNodeByName(current_hop)
302
303 if path.index(current_hop) < len(path)-1:
304 next_hop = path[path.index(current_hop)+1]
305 else:
306 #last switch reached
307 next_hop = vnf_dst_name
308
309 next_node = self.getNodeByName(next_hop)
310
311 if next_hop == vnf_dst_name:
312 switch_outport_nr = dst_sw_outport_nr
313 logging.info("end node reached: {0}".format(vnf_dst_name))
314 elif not isinstance( next_node, OVSSwitch ):
315 logging.info("Next node: {0} is not a switch".format(next_hop))
316 return "Next node: {0} is not a switch".format(next_hop)
317 else:
318 # take first link between switches by default
319 index_edge_out = 0
320 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
321
322
323 # set of entry via ovs-ofctl
324 if isinstance( current_node, OVSSwitch ):
325 kwargs['vlan'] = vlan
326 kwargs['path'] = path
327 kwargs['current_hop'] = current_hop
328 self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
329 # TODO set entry via Ryu REST api (in case emulator is running remote...)
330
331 # take first link between switches by default
332 if isinstance( next_node, OVSSwitch ):
333 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
334 current_hop = next_hop
335
336 return "path added between {0} and {1}".format(vnf_src_name, vnf_dst_name)
337
338 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
339 match = 'in_port=%s' % switch_inport_nr
340
341 cookie = kwargs.get('cookie')
342 match_input = kwargs.get('match')
343 cmd = kwargs.get('cmd')
344 path = kwargs.get('path')
345 current_hop = kwargs.get('current_hop')
346 vlan = kwargs.get('vlan')
347
348 s = ','
349 if cookie:
350 cookie = 'cookie=%s' % cookie
351 match = s.join([cookie, match])
352 if match_input:
353 match = s.join([match, match_input])
354 if cmd == 'add-flow':
355 action = 'action=%s' % switch_outport_nr
356 if vlan != None:
357 if path.index(current_hop) == 0: # first node
358 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
359 match = '-O OpenFlow13 ' + match
360 elif path.index(current_hop) == len(path) - 1: # last node
361 match += ',dl_vlan=%s' % vlan
362 action = 'action=strip_vlan,output=%s' % switch_outport_nr
363 else: # middle nodes
364 match += ',dl_vlan=%s' % vlan
365 ofcmd = s.join([match, action])
366 elif cmd == 'del-flows':
367 ofcmd = match
368 else:
369 ofcmd = ''
370
371 node.dpctl(cmd, ofcmd)
372 logging.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
373 switch_outport_nr, cmd))
374
375 # start Ryu Openflow controller as Remote Controller for the DCNetwork
376 def startRyu(self):
377 # start Ryu controller with rest-API
378 python_install_path = site.getsitepackages()[0]
379 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
380 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
381 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
382 # Ryu still uses 6633 as default
383 ryu_option = '--ofp-tcp-listen-port'
384 ryu_of_port = '6653'
385 ryu_cmd = 'ryu-manager'
386 FNULL = open("/tmp/ryu.log", 'w')
387 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
388 # no learning switch
389 #self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
390 time.sleep(1)
391
392 def stopRyu(self):
393 if self.ryu_process is not None:
394 self.ryu_process.terminate()
395 self.ryu_process.kill()
396