248dc3481e61bc3337541eb6023a090f30d0af65
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Distributed Cloud Emulator (dcemulator)
3 (c) 2015 by Manuel Peuster <manuel.peuster@upb.de>
4 """
5 import logging
6
7 import site
8 import time
9 from subprocess import Popen
10 import os
11 import re
12 import urllib2
13 from functools import partial
14
15 from mininet.net import Dockernet
16 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
17 from mininet.cli import CLI
18 from mininet.link import TCLink
19 import networkx as nx
20 from emuvim.dcemulator.monitoring import DCNetworkMonitor
21 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
22 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
23
24 class DCNetwork(Dockernet):
25 """
26 Wraps the original Mininet/Dockernet class and provides
27 methods to add data centers, switches, etc.
28
29 This class is used by topology definition scripts.
30 """
31
32 def __init__(self, controller=RemoteController, monitor=False,
33 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
34 dc_emulation_max_mem=512, # emulation max mem in MB
35 **kwargs):
36 """
37 Create an extended version of a Dockernet network
38 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
39 :param kwargs: path through for Mininet parameters
40 :return:
41 """
42 self.dcs = {}
43
44 # call original Docker.__init__ and setup default controller
45 Dockernet.__init__(
46 self, switch=OVSKernelSwitch, **kwargs)
47
48 # Ryu management
49 self.ryu_process = None
50 if controller == RemoteController:
51 # start Ryu controller
52 self.startRyu()
53
54 # add the specified controller
55 self.addController('c0', controller=controller)
56
57 # graph of the complete DC network
58 self.DCNetwork_graph = nx.MultiDiGraph()
59
60 # initialize pool of vlan tags to setup the SDN paths
61 self.vlans = range(4096)[::-1]
62
63 # link to Ryu REST_API
64 ryu_ip = '0.0.0.0'
65 ryu_port = '8080'
66 self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)
67
68 # monitoring agent
69 if monitor:
70 self.monitor_agent = DCNetworkMonitor(self)
71 else:
72 self.monitor_agent = None
73
74 # initialize resource model registrar
75 self.rm_registrar = ResourceModelRegistrar(
76 dc_emulation_max_cpu, dc_emulation_max_mem)
77
78 def addDatacenter(self, label, metadata={}, resource_log_path=None):
79 """
80 Create and add a logical cloud data center to the network.
81 """
82 if label in self.dcs:
83 raise Exception("Data center label already exists: %s" % label)
84 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
85 dc.net = self # set reference to network
86 self.dcs[label] = dc
87 dc.create() # finally create the data center in our Mininet instance
88 logging.info("added data center: %s" % label)
89 return dc
90
91 def addLink(self, node1, node2, **params):
92 """
93 Able to handle Datacenter objects as link
94 end points.
95 """
96 assert node1 is not None
97 assert node2 is not None
98 logging.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
99 # ensure type of node1
100 if isinstance( node1, basestring ):
101 if node1 in self.dcs:
102 node1 = self.dcs[node1].switch
103 if isinstance( node1, Datacenter ):
104 node1 = node1.switch
105 # ensure type of node2
106 if isinstance( node2, basestring ):
107 if node2 in self.dcs:
108 node2 = self.dcs[node2].switch
109 if isinstance( node2, Datacenter ):
110 node2 = node2.switch
111 # try to give containers a default IP
112 if isinstance( node1, Docker ):
113 if "params1" not in params:
114 params["params1"] = {}
115 if "ip" not in params["params1"]:
116 params["params1"]["ip"] = self.getNextIp()
117 if isinstance( node2, Docker ):
118 if "params2" not in params:
119 params["params2"] = {}
120 if "ip" not in params["params2"]:
121 params["params2"]["ip"] = self.getNextIp()
122 # ensure that we allow TCLinks between data centers
123 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
124 # see Dockernet issue: https://github.com/mpeuster/dockernet/issues/3
125 if "cls" not in params:
126 params["cls"] = TCLink
127
128 link = Dockernet.addLink(self, node1, node2, **params)
129
130 # try to give container interfaces a default id
131 node1_port_id = node1.ports[link.intf1]
132 if isinstance(node1, Docker):
133 if "id" in params["params1"]:
134 node1_port_id = params["params1"]["id"]
135 node1_port_name = link.intf1.name
136
137 node2_port_id = node2.ports[link.intf2]
138 if isinstance(node2, Docker):
139 if "id" in params["params2"]:
140 node2_port_id = params["params2"]["id"]
141 node2_port_name = link.intf2.name
142
143
144 # add edge and assigned port number to graph in both directions between node1 and node2
145 # port_id: id given in descriptor (if available, otherwise same as port)
146 # port: portnumber assigned by Dockernet
147
148 attr_dict = {}
149 # possible weight metrics allowed by TClink class:
150 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
151 edge_attributes = [p for p in params if p in weight_metrics]
152 for attr in edge_attributes:
153 # if delay: strip ms (need number as weight in graph)
154 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
155 if match:
156 attr_number = match.group(1)
157 else:
158 attr_number = None
159 attr_dict[attr] = attr_number
160
161
162 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
163 'src_port_name': node1_port_name,
164 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
165 'dst_port_name': node2_port_name}
166 attr_dict2.update(attr_dict)
167 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
168
169 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
170 'src_port_name': node2_port_name,
171 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
172 'dst_port_name': node1_port_name}
173 attr_dict2.update(attr_dict)
174 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
175
176 return link
177
178 def addDocker( self, label, **params ):
179 """
180 Wrapper for addDocker method to use custom container class.
181 """
182 self.DCNetwork_graph.add_node(label)
183 return Dockernet.addDocker(self, label, cls=EmulatorCompute, **params)
184
185 def removeDocker( self, label, **params ):
186 """
187 Wrapper for removeDocker method to update graph.
188 """
189 self.DCNetwork_graph.remove_node(label)
190 return Dockernet.removeDocker(self, label, **params)
191
192 def addSwitch( self, name, add_to_graph=True, **params ):
193 """
194 Wrapper for addSwitch method to store switch also in graph.
195 """
196 if add_to_graph:
197 self.DCNetwork_graph.add_node(name)
198 return Dockernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
199
200 def getAllContainers(self):
201 """
202 Returns a list with all containers within all data centers.
203 """
204 all_containers = []
205 for dc in self.dcs.itervalues():
206 all_containers += dc.listCompute()
207 return all_containers
208
209 def start(self):
210 # start
211 for dc in self.dcs.itervalues():
212 dc.start()
213 Dockernet.start(self)
214
215 def stop(self):
216
217 # stop the monitor agent
218 if self.monitor_agent is not None:
219 self.monitor_agent.stop()
220
221 # stop emulator net
222 Dockernet.stop(self)
223
224 # stop Ryu controller
225 self.stopRyu()
226
227
228 def CLI(self):
229 CLI(self)
230
231 # to remove chain do setChain( src, dst, cmd='del-flows')
232 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
233 cmd = kwargs.get('cmd')
234 if cmd == 'add-flow':
235 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
236 if kwargs.get('bidirectional'):
237 return ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
238
239 elif cmd == 'del-flows': # TODO: del-flow to be implemented
240 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
241 if kwargs.get('bidirectional'):
242 return ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
243
244 else:
245 return "Command unknown"
246
247
248 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
249
250 # TODO: this needs to be cleaned up
251 #check if port is specified (vnf:port)
252 if vnf_src_interface is None:
253 # take first interface by default
254 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
255 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
256 vnf_src_interface = link_dict[0]['src_port_id']
257
258 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
259 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
260 for link in link_dict:
261 if link_dict[link]['src_port_id'] == vnf_src_interface:
262 # found the right link and connected switch
263 src_sw = connected_sw
264
265 src_sw_inport_nr = link_dict[link]['dst_port_nr']
266 break
267
268 if vnf_dst_interface is None:
269 # take first interface by default
270 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
271 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
272 vnf_dst_interface = link_dict[0]['dst_port_id']
273
274 vnf_dst_name = vnf_dst_name.split(':')[0]
275 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
276 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
277 for link in link_dict:
278 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
279 # found the right link and connected switch
280 dst_sw = connected_sw
281 dst_sw_outport_nr = link_dict[link]['src_port_nr']
282 break
283
284
285 # get shortest path
286 try:
287 # returns the first found shortest path
288 # if all shortest paths are wanted, use: all_shortest_paths
289 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
290 except:
291 logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
292 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
293
294 logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
295
296 current_hop = src_sw
297 switch_inport_nr = src_sw_inport_nr
298
299 # choose free vlan if path contains more than 1 switch
300 cmd = kwargs.get('cmd')
301 vlan = None
302 if cmd == 'add-flow':
303 if len(path) > 1:
304 vlan = self.vlans.pop()
305
306 for i in range(0,len(path)):
307 current_node = self.getNodeByName(current_hop)
308
309 if path.index(current_hop) < len(path)-1:
310 next_hop = path[path.index(current_hop)+1]
311 else:
312 #last switch reached
313 next_hop = vnf_dst_name
314
315 next_node = self.getNodeByName(next_hop)
316
317 if next_hop == vnf_dst_name:
318 switch_outport_nr = dst_sw_outport_nr
319 logging.info("end node reached: {0}".format(vnf_dst_name))
320 elif not isinstance( next_node, OVSSwitch ):
321 logging.info("Next node: {0} is not a switch".format(next_hop))
322 return "Next node: {0} is not a switch".format(next_hop)
323 else:
324 # take first link between switches by default
325 index_edge_out = 0
326 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
327
328
329 # set of entry via ovs-ofctl
330 if isinstance( current_node, OVSSwitch ):
331 kwargs['vlan'] = vlan
332 kwargs['path'] = path
333 kwargs['current_hop'] = current_hop
334 ## set flow entry via ovs-ofctl
335 #self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
336 ## set flow entry via ryu rest api
337 self._set_flow_entry_ryu_rest(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
338
339 # take first link between switches by default
340 if isinstance( next_node, OVSSwitch ):
341 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
342 current_hop = next_hop
343
344 return "path {2} between {0} and {1}".format(vnf_src_name, vnf_dst_name, cmd)
345
346 def _set_flow_entry_ryu_rest(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
347 match = 'in_port=%s' % switch_inport_nr
348
349 cookie = kwargs.get('cookie')
350 match_input = kwargs.get('match')
351 cmd = kwargs.get('cmd')
352 path = kwargs.get('path')
353 current_hop = kwargs.get('current_hop')
354 vlan = kwargs.get('vlan')
355
356 s = ','
357 if match_input:
358 match = s.join([match, match_input])
359
360 flow = {}
361 flow['dpid'] = int(node.dpid, 16)
362 if cookie:
363 flow['cookie'] = int(cookie)
364
365
366 flow['actions'] = []
367
368 # possible Ryu actions, match fields:
369 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
370 if cmd == 'add-flow':
371 prefix = 'stats/flowentry/add'
372 action = {}
373 action['type'] = 'OUTPUT'
374 action['port'] = switch_outport_nr
375 flow['actions'].append(action)
376 if vlan != None:
377 if path.index(current_hop) == 0: # first node
378 action = {}
379 action['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
380 action['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
381 action['type'] = 'SET_FIELD'
382 action['field'] = 'vlan_vid'
383 action['value'] = vlan
384 flow['actions'].append(action)
385 elif path.index(current_hop) == len(path) - 1: # last node
386 match += ',dl_vlan=%s' % vlan
387 action = {}
388 action['type'] = 'POP_VLAN'
389 flow['actions'].append(action)
390 else: # middle nodes
391 match += ',dl_vlan=%s' % vlan
392 #flow['match'] = self._parse_match(match)
393 elif cmd == 'del-flows':
394 #del(flow['actions'])
395 prefix = 'stats/flowentry/delete'
396 if cookie:
397 flow['cookie_mask'] = cookie
398 #if cookie is None:
399 # flow['match'] = self._parse_match(match)
400
401 action = {}
402 action['type'] = 'OUTPUT'
403 action['port'] = switch_outport_nr
404 flow['actions'].append(action)
405
406 flow['match'] = self._parse_match(match)
407 self.ryu_REST(prefix, data=flow)
408
409 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
410 match = 'in_port=%s' % switch_inport_nr
411
412 cookie = kwargs.get('cookie')
413 match_input = kwargs.get('match')
414 cmd = kwargs.get('cmd')
415 path = kwargs.get('path')
416 current_hop = kwargs.get('current_hop')
417 vlan = kwargs.get('vlan')
418
419 s = ','
420 if cookie:
421 cookie = 'cookie=%s' % cookie
422 match = s.join([cookie, match])
423 if match_input:
424 match = s.join([match, match_input])
425 if cmd == 'add-flow':
426 action = 'action=%s' % switch_outport_nr
427 if vlan != None:
428 if path.index(current_hop) == 0: # first node
429 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
430 match = '-O OpenFlow13 ' + match
431 elif path.index(current_hop) == len(path) - 1: # last node
432 match += ',dl_vlan=%s' % vlan
433 action = 'action=strip_vlan,output=%s' % switch_outport_nr
434 else: # middle nodes
435 match += ',dl_vlan=%s' % vlan
436 ofcmd = s.join([match, action])
437 elif cmd == 'del-flows':
438 ofcmd = match
439 else:
440 ofcmd = ''
441
442 node.dpctl(cmd, ofcmd)
443 logging.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
444 switch_outport_nr, cmd))
445
446 # start Ryu Openflow controller as Remote Controller for the DCNetwork
447 def startRyu(self):
448 # start Ryu controller with rest-API
449 python_install_path = site.getsitepackages()[0]
450 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
451 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
452 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
453 # Ryu still uses 6633 as default
454 ryu_option = '--ofp-tcp-listen-port'
455 ryu_of_port = '6653'
456 ryu_cmd = 'ryu-manager'
457 FNULL = open("/tmp/ryu.log", 'w')
458 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
459 # no learning switch
460 #self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
461 time.sleep(1)
462
463 def stopRyu(self):
464 if self.ryu_process is not None:
465 self.ryu_process.terminate()
466 self.ryu_process.kill()
467
468 def ryu_REST(self, prefix, dpid=None, data=None):
469 if dpid:
470 url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)
471 else:
472 url = self.ryu_REST_api + '/' + str(prefix)
473 if data:
474 #logging.info('POST: {0}'.format(str(data)))
475 req = urllib2.Request(url, str(data))
476 else:
477 req = urllib2.Request(url)
478
479 ret = urllib2.urlopen(req).read()
480 return ret
481
482 # need to respect that some match fields must be integers
483 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
484 def _parse_match(self, match):
485 matches = match.split(',')
486 dict = {}
487 for m in matches:
488 match = m.split('=')
489 if len(match) == 2:
490 try:
491 m2 = int(match[1], 0)
492 except:
493 m2 = match[1]
494
495 dict.update({match[0]:m2})
496 return dict
497