add weight metric for adding network links
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
4 """
5 import logging
6
7 import site
8 import time
9 from subprocess import Popen
10 import os
11 import re
12
13
14
15 from mininet.net import Dockernet
16 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
17 from mininet.cli import CLI
18 from mininet.link import TCLink
19 import networkx as nx
20 from emuvim.dcemulator.monitoring import DCNetworkMonitor
21 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
22 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
23
24 class DCNetwork(Dockernet):
25 """
26 Wraps the original Mininet/Dockernet class and provides
27 methods to add data centers, switches, etc.
28
29 This class is used by topology definition scripts.
30 """
31
32 def __init__(self, controller=RemoteController, monitor=False,
33 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
34 dc_emulation_max_mem=512, # emulation max mem in MB
35 **kwargs):
36 """
37 Create an extended version of a Dockernet network
38 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
39 :param kwargs: path through for Mininet parameters
40 :return:
41 """
42 self.dcs = {}
43
44 # call original Docker.__init__ and setup default controller
45 Dockernet.__init__(
46 self, switch=OVSKernelSwitch, **kwargs)
47
48 # Ryu management
49 self.ryu_process = None
50 if controller == RemoteController:
51 # start Ryu controller
52 self.startRyu()
53
54 # add the specified controller
55 self.addController('c0', controller=controller)
56
57 # graph of the complete DC network
58 self.DCNetwork_graph = nx.MultiDiGraph()
59
60 # monitoring agent
61 if monitor:
62 self.monitor_agent = DCNetworkMonitor(self)
63 else:
64 self.monitor_agent = None
65
66 # initialize resource model registrar
67 self.rm_registrar = ResourceModelRegistrar(
68 dc_emulation_max_cpu, dc_emulation_max_mem)
69
70 def addDatacenter(self, label, metadata={}, resource_log_path=None):
71 """
72 Create and add a logical cloud data center to the network.
73 """
74 if label in self.dcs:
75 raise Exception("Data center label already exists: %s" % label)
76 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
77 dc.net = self # set reference to network
78 self.dcs[label] = dc
79 dc.create() # finally create the data center in our Mininet instance
80 logging.info("added data center: %s" % label)
81 return dc
82
83 def addLink(self, node1, node2, **params):
84 """
85 Able to handle Datacenter objects as link
86 end points.
87 """
88 assert node1 is not None
89 assert node2 is not None
90 logging.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
91 # ensure type of node1
92 if isinstance( node1, basestring ):
93 if node1 in self.dcs:
94 node1 = self.dcs[node1].switch
95 if isinstance( node1, Datacenter ):
96 node1 = node1.switch
97 # ensure type of node2
98 if isinstance( node2, basestring ):
99 if node2 in self.dcs:
100 node2 = self.dcs[node2].switch
101 if isinstance( node2, Datacenter ):
102 node2 = node2.switch
103 # try to give containers a default IP
104 if isinstance( node1, Docker ):
105 if "params1" not in params:
106 params["params1"] = {}
107 if "ip" not in params["params1"]:
108 params["params1"]["ip"] = self.getNextIp()
109 if isinstance( node2, Docker ):
110 if "params2" not in params:
111 params["params2"] = {}
112 if "ip" not in params["params2"]:
113 params["params2"]["ip"] = self.getNextIp()
114 # ensure that we allow TCLinks between data centers
115 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
116 # see Dockernet issue: https://github.com/mpeuster/dockernet/issues/3
117 if "cls" not in params:
118 params["cls"] = TCLink
119
120 link = Dockernet.addLink(self, node1, node2, **params)
121
122 # try to give container interfaces a default id
123 node1_port_id = node1.ports[link.intf1]
124 if isinstance(node1, Docker):
125 if "id" in params["params1"]:
126 node1_port_id = params["params1"]["id"]
127
128 node2_port_id = node2.ports[link.intf2]
129 if isinstance(node2, Docker):
130 if "id" in params["params2"]:
131 node2_port_id = params["params2"]["id"]
132
133
134
135 # add edge and assigned port number to graph in both directions between node1 and node2
136 # port_id: id given in descriptor (if available, otherwise same as port)
137 # port: portnumber assigned by Dockernet
138
139 attr_dict = {}
140 # possible weight metrics allowed by TClink class:
141 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
142 edge_attributes = [p for p in params if p in weight_metrics]
143 for attr in edge_attributes:
144 # if delay: strip ms (need number as weight in graph)
145 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
146 if match:
147 attr_number = match.group(1)
148 else:
149 attr_number = None
150 attr_dict[attr] = attr_number
151
152
153 attr_dict2 = {'src_port_id': node1_port_id, 'src_port': node1.ports[link.intf1],
154 'dst_port_id': node2_port_id, 'dst_port': node2.ports[link.intf2]}
155 attr_dict2.update(attr_dict)
156 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
157
158 attr_dict2 = {'src_port_id': node2_port_id, 'src_port': node2.ports[link.intf2],
159 'dst_port_id': node1_port_id, 'dst_port': node1.ports[link.intf1]}
160 attr_dict2.update(attr_dict)
161 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
162
163 return link
164
165 def addDocker( self, label, **params ):
166 """
167 Wrapper for addDocker method to use custom container class.
168 """
169 self.DCNetwork_graph.add_node(label)
170 return Dockernet.addDocker(self, label, cls=EmulatorCompute, **params)
171
172 def removeDocker( self, label, **params ):
173 """
174 Wrapper for removeDocker method to update graph.
175 """
176 self.DCNetwork_graph.remove_node(label)
177 return Dockernet.removeDocker(self, label, **params)
178
179 def addSwitch( self, name, add_to_graph=True, **params ):
180 """
181 Wrapper for addSwitch method to store switch also in graph.
182 """
183 if add_to_graph:
184 self.DCNetwork_graph.add_node(name)
185 return Dockernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
186
187 def getAllContainers(self):
188 """
189 Returns a list with all containers within all data centers.
190 """
191 all_containers = []
192 for dc in self.dcs.itervalues():
193 all_containers += dc.listCompute()
194 return all_containers
195
196 def start(self):
197 # start
198 for dc in self.dcs.itervalues():
199 dc.start()
200 Dockernet.start(self)
201
202 def stop(self):
203
204 # stop the monitor agent
205 if self.monitor_agent is not None:
206 self.monitor_agent.stop()
207
208 # stop emulator net
209 Dockernet.stop(self)
210
211 # stop Ryu controller
212 self.stopRyu()
213
214
215 def CLI(self):
216 CLI(self)
217
218 # to remove chain do setChain( src, dst, cmd='del-flows')
219 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, cmd='add-flow', weight=None):
220
221 #check if port is specified (vnf:port)
222 if vnf_src_interface is None:
223 # take first interface by default
224 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
225 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
226 vnf_src_interface = link_dict[0]['src_port_id']
227 #vnf_source_interface = 0
228
229 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
230 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
231 for link in link_dict:
232 #logging.info("{0},{1}".format(link_dict[link],vnf_source_interface))
233 if link_dict[link]['src_port_id'] == vnf_src_interface:
234 # found the right link and connected switch
235 #logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
236 src_sw = connected_sw
237
238 src_sw_inport = link_dict[link]['dst_port']
239 break
240
241 if vnf_dst_interface is None:
242 # take first interface by default
243 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
244 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
245 vnf_dst_interface = link_dict[0]['dst_port_id']
246 #vnf_dest_interface = 0
247
248 vnf_dst_name = vnf_dst_name.split(':')[0]
249 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
250 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
251 for link in link_dict:
252 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
253 # found the right link and connected switch
254 dst_sw = connected_sw
255 dst_sw_outport = link_dict[link]['src_port']
256 break
257
258
259 # get shortest path
260 #path = nx.shortest_path(self.DCNetwork_graph, vnf_src_name, vnf_dst_name)
261 try:
262 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=weight)
263 except:
264 logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
265 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
266
267 logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
268
269 #current_hop = vnf_src_name
270 current_hop = src_sw
271 switch_inport = src_sw_inport
272
273 for i in range(0,len(path)):
274 current_node = self.getNodeByName(current_hop)
275 if path.index(current_hop) < len(path)-1:
276 next_hop = path[path.index(current_hop)+1]
277 else:
278 #last switch reached
279 next_hop = vnf_dst_name
280
281 next_node = self.getNodeByName(next_hop)
282
283 if next_hop == vnf_dst_name:
284 switch_outport = dst_sw_outport
285 logging.info("end node reached: {0}".format(vnf_dst_name))
286 elif not isinstance( next_node, OVSSwitch ):
287 logging.info("Next node: {0} is not a switch".format(next_hop))
288 return "Next node: {0} is not a switch".format(next_hop)
289 else:
290 # take first link between switches by default
291 index_edge_out = 0
292 switch_outport = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port']
293
294 # take into account that multiple edges are possible between 2 nodes
295 index_edge_in = 0
296
297
298 #switch_inport = self.DCNetwork_graph[current_hop][next_hop][index_edge_in]['dst_port']
299
300 #next2_hop = path[path.index(current_hop)+2]
301 #index_edge_out = 0
302 #switch_outport = self.DCNetwork_graph[next_hop][next2_hop][index_edge_out]['src_port']
303 #switch_outport = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port']
304
305 #logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport, switch_outport))
306 # set of entry via ovs-ofctl
307 # TODO use rest API of ryu to set flow entries to correct dpid
308 # TODO this only sets port in to out, no match, so this will give trouble when multiple services are deployed...
309 # TODO need multiple matches to do this (VLAN tags)
310 if isinstance( current_node, OVSSwitch ):
311 match = 'in_port=%s' % switch_inport
312
313 if cmd=='add-flow':
314 action = 'action=%s' % switch_outport
315 s = ','
316 ofcmd = s.join([match,action])
317 elif cmd=='del-flows':
318 ofcmd = match
319 else:
320 ofcmd=''
321
322 current_node.dpctl(cmd, ofcmd)
323 logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport,
324 switch_outport))
325 # take first link between switches by default
326 if isinstance( next_node, OVSSwitch ):
327 switch_inport = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port']
328 current_hop = next_hop
329
330 return "path added between {0} and {1}".format(vnf_src_name, vnf_dst_name)
331 #return "destination node: {0} not reached".format(vnf_dst_name)
332
333 # start Ryu Openflow controller as Remote Controller for the DCNetwork
334 def startRyu(self):
335 # start Ryu controller with rest-API
336 python_install_path = site.getsitepackages()[0]
337 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
338 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
339 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
340 # Ryu still uses 6633 as default
341 ryu_option = '--ofp-tcp-listen-port'
342 ryu_of_port = '6653'
343 ryu_cmd = 'ryu-manager'
344 FNULL = open("/tmp/ryu.log", 'w')
345 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
346 # no learning switch
347 #self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
348 time.sleep(1)
349
350 def stopRyu(self):
351 if self.ryu_process is not None:
352 self.ryu_process.terminate()
353 self.ryu_process.kill()
354