enable learning switch
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
4 """
5 import logging
6
7 import site
8 import time
9 from subprocess import Popen
10 import os
11 import re
12
13
14
15 from mininet.net import Dockernet
16 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
17 from mininet.cli import CLI
18 from mininet.link import TCLink
19 import networkx as nx
20 from emuvim.dcemulator.monitoring import DCNetworkMonitor
21 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
22 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
23
24 class DCNetwork(Dockernet):
25 """
26 Wraps the original Mininet/Dockernet class and provides
27 methods to add data centers, switches, etc.
28
29 This class is used by topology definition scripts.
30 """
31
32 def __init__(self, controller=RemoteController, monitor=False,
33 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
34 dc_emulation_max_mem=512, # emulation max mem in MB
35 **kwargs):
36 """
37 Create an extended version of a Dockernet network
38 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
39 :param kwargs: path through for Mininet parameters
40 :return:
41 """
42 self.dcs = {}
43
44 # call original Docker.__init__ and setup default controller
45 Dockernet.__init__(
46 self, switch=OVSKernelSwitch, **kwargs)
47
48 # Ryu management
49 self.ryu_process = None
50 if controller == RemoteController:
51 # start Ryu controller
52 self.startRyu()
53
54 # add the specified controller
55 self.addController('c0', controller=controller)
56
57 # graph of the complete DC network
58 self.DCNetwork_graph = nx.MultiDiGraph()
59
60 # monitoring agent
61 if monitor:
62 self.monitor_agent = DCNetworkMonitor(self)
63 else:
64 self.monitor_agent = None
65
66 # initialize resource model registrar
67 self.rm_registrar = ResourceModelRegistrar(
68 dc_emulation_max_cpu, dc_emulation_max_mem)
69
70 def addDatacenter(self, label, metadata={}, resource_log_path=None):
71 """
72 Create and add a logical cloud data center to the network.
73 """
74 if label in self.dcs:
75 raise Exception("Data center label already exists: %s" % label)
76 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
77 dc.net = self # set reference to network
78 self.dcs[label] = dc
79 dc.create() # finally create the data center in our Mininet instance
80 logging.info("added data center: %s" % label)
81 return dc
82
83 def addLink(self, node1, node2, **params):
84 """
85 Able to handle Datacenter objects as link
86 end points.
87 """
88 assert node1 is not None
89 assert node2 is not None
90 logging.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
91 # ensure type of node1
92 if isinstance( node1, basestring ):
93 if node1 in self.dcs:
94 node1 = self.dcs[node1].switch
95 if isinstance( node1, Datacenter ):
96 node1 = node1.switch
97 # ensure type of node2
98 if isinstance( node2, basestring ):
99 if node2 in self.dcs:
100 node2 = self.dcs[node2].switch
101 if isinstance( node2, Datacenter ):
102 node2 = node2.switch
103 # try to give containers a default IP
104 if isinstance( node1, Docker ):
105 if "params1" not in params:
106 params["params1"] = {}
107 if "ip" not in params["params1"]:
108 params["params1"]["ip"] = self.getNextIp()
109 if isinstance( node2, Docker ):
110 if "params2" not in params:
111 params["params2"] = {}
112 if "ip" not in params["params2"]:
113 params["params2"]["ip"] = self.getNextIp()
114 # ensure that we allow TCLinks between data centers
115 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
116 # see Dockernet issue: https://github.com/mpeuster/dockernet/issues/3
117 if "cls" not in params:
118 params["cls"] = TCLink
119
120 link = Dockernet.addLink(self, node1, node2, **params)
121
122 # try to give container interfaces a default id
123 node1_port_id = node1.ports[link.intf1]
124 if isinstance(node1, Docker):
125 if "id" in params["params1"]:
126 node1_port_id = params["params1"]["id"]
127 node1_port_name = link.intf1.name
128
129 node2_port_id = node2.ports[link.intf2]
130 if isinstance(node2, Docker):
131 if "id" in params["params2"]:
132 node2_port_id = params["params2"]["id"]
133 node2_port_name = link.intf2.name
134
135
136 # add edge and assigned port number to graph in both directions between node1 and node2
137 # port_id: id given in descriptor (if available, otherwise same as port)
138 # port: portnumber assigned by Dockernet
139
140 attr_dict = {}
141 # possible weight metrics allowed by TClink class:
142 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
143 edge_attributes = [p for p in params if p in weight_metrics]
144 for attr in edge_attributes:
145 # if delay: strip ms (need number as weight in graph)
146 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
147 if match:
148 attr_number = match.group(1)
149 else:
150 attr_number = None
151 attr_dict[attr] = attr_number
152
153
154 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
155 'src_port_name': node1_port_name,
156 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
157 'dst_port_name': node2_port_name}
158 attr_dict2.update(attr_dict)
159 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
160
161 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
162 'src_port_name': node2_port_name,
163 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
164 'dst_port_name': node1_port_name}
165 attr_dict2.update(attr_dict)
166 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
167
168 return link
169
170 def addDocker( self, label, **params ):
171 """
172 Wrapper for addDocker method to use custom container class.
173 """
174 self.DCNetwork_graph.add_node(label)
175 return Dockernet.addDocker(self, label, cls=EmulatorCompute, **params)
176
177 def removeDocker( self, label, **params ):
178 """
179 Wrapper for removeDocker method to update graph.
180 """
181 self.DCNetwork_graph.remove_node(label)
182 return Dockernet.removeDocker(self, label, **params)
183
184 def addSwitch( self, name, add_to_graph=True, **params ):
185 """
186 Wrapper for addSwitch method to store switch also in graph.
187 """
188 if add_to_graph:
189 self.DCNetwork_graph.add_node(name)
190 return Dockernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
191
192 def getAllContainers(self):
193 """
194 Returns a list with all containers within all data centers.
195 """
196 all_containers = []
197 for dc in self.dcs.itervalues():
198 all_containers += dc.listCompute()
199 return all_containers
200
201 def start(self):
202 # start
203 for dc in self.dcs.itervalues():
204 dc.start()
205 Dockernet.start(self)
206
207 def stop(self):
208
209 # stop the monitor agent
210 if self.monitor_agent is not None:
211 self.monitor_agent.stop()
212
213 # stop emulator net
214 Dockernet.stop(self)
215
216 # stop Ryu controller
217 self.stopRyu()
218
219
220 def CLI(self):
221 CLI(self)
222
223 # to remove chain do setChain( src, dst, cmd='del-flows')
224 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, cmd='add-flow', weight=None):
225
226 #check if port is specified (vnf:port)
227 if vnf_src_interface is None:
228 # take first interface by default
229 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
230 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
231 vnf_src_interface = link_dict[0]['src_port_id']
232 #logging.info('vnf_src_if: {0}'.format(vnf_src_interface))
233
234 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
235 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
236 for link in link_dict:
237 #logging.info("here1: {0},{1}".format(link_dict[link],vnf_src_interface))
238 if link_dict[link]['src_port_id'] == vnf_src_interface:
239 # found the right link and connected switch
240 #logging.info("conn_sw: {2},{0},{1}".format(link_dict[link]['src_port_id'], vnf_src_interface, connected_sw))
241 src_sw = connected_sw
242
243 src_sw_inport_nr = link_dict[link]['dst_port_nr']
244 break
245
246 if vnf_dst_interface is None:
247 # take first interface by default
248 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
249 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
250 vnf_dst_interface = link_dict[0]['dst_port_id']
251
252 vnf_dst_name = vnf_dst_name.split(':')[0]
253 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
254 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
255 for link in link_dict:
256 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
257 # found the right link and connected switch
258 dst_sw = connected_sw
259 dst_sw_outport_nr = link_dict[link]['src_port_nr']
260 break
261
262
263 # get shortest path
264 #path = nx.shortest_path(self.DCNetwork_graph, vnf_src_name, vnf_dst_name)
265 try:
266 # returns the first found shortest path
267 # if all shortest paths are wanted, use: all_shortest_paths
268 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=weight)
269 except:
270 logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
271 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
272
273 logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
274
275 #current_hop = vnf_src_name
276 current_hop = src_sw
277 switch_inport_nr = src_sw_inport_nr
278
279 for i in range(0,len(path)):
280 current_node = self.getNodeByName(current_hop)
281 if path.index(current_hop) < len(path)-1:
282 next_hop = path[path.index(current_hop)+1]
283 else:
284 #last switch reached
285 next_hop = vnf_dst_name
286
287 next_node = self.getNodeByName(next_hop)
288
289 if next_hop == vnf_dst_name:
290 switch_outport_nr = dst_sw_outport_nr
291 logging.info("end node reached: {0}".format(vnf_dst_name))
292 elif not isinstance( next_node, OVSSwitch ):
293 logging.info("Next node: {0} is not a switch".format(next_hop))
294 return "Next node: {0} is not a switch".format(next_hop)
295 else:
296 # take first link between switches by default
297 index_edge_out = 0
298 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
299
300
301 #logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport_nr, switch_outport_nr))
302 # set of entry via ovs-ofctl
303 # TODO use rest API of ryu to set flow entries to correct dpid
304 # TODO this only sets port in to out, no match, so this will give trouble when multiple services are deployed...
305 # TODO need multiple matches to do this (VLAN tags)
306 if isinstance( current_node, OVSSwitch ):
307 match = 'in_port=%s' % switch_inport_nr
308
309 if cmd=='add-flow':
310 action = 'action=%s' % switch_outport_nr
311 s = ','
312 ofcmd = s.join([match,action])
313 elif cmd=='del-flows':
314 ofcmd = match
315 else:
316 ofcmd=''
317
318 current_node.dpctl(cmd, ofcmd)
319 logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport_nr,
320 switch_outport_nr))
321 # take first link between switches by default
322 if isinstance( next_node, OVSSwitch ):
323 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
324 current_hop = next_hop
325
326 return "path added between {0} and {1}".format(vnf_src_name, vnf_dst_name)
327 #return "destination node: {0} not reached".format(vnf_dst_name)
328
329 # start Ryu Openflow controller as Remote Controller for the DCNetwork
330 def startRyu(self):
331 # start Ryu controller with rest-API
332 python_install_path = site.getsitepackages()[0]
333 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
334 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
335 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
336 # Ryu still uses 6633 as default
337 ryu_option = '--ofp-tcp-listen-port'
338 ryu_of_port = '6653'
339 ryu_cmd = 'ryu-manager'
340 FNULL = open("/tmp/ryu.log", 'w')
341 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
342 # no learning switch
343 #self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
344 time.sleep(1)
345
346 def stopRyu(self):
347 if self.ryu_process is not None:
348 self.ryu_process.terminate()
349 self.ryu_process.kill()
350