51f53a3bda00765245ca16d771eccd107c58d4ef
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Copyright (c) 2015 SONATA-NFV and Paderborn University
3 ALL RIGHTS RESERVED.
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
20 permission.
21
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
27 """
28 import logging
29
30 import site
31 import time
32 from subprocess import Popen
33 import os
34 import re
35 import urllib2
36 from functools import partial
37
38 from mininet.net import Containernet
39 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
40 from mininet.cli import CLI
41 from mininet.link import TCLink
42 import networkx as nx
43 from emuvim.dcemulator.monitoring import DCNetworkMonitor
44 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
45 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
46
47 LOG = logging.getLogger("dcemulator.net")
48 LOG.setLevel(logging.DEBUG)
49
50 class DCNetwork(Containernet):
51 """
52 Wraps the original Mininet/Containernet class and provides
53 methods to add data centers, switches, etc.
54
55 This class is used by topology definition scripts.
56 """
57
58 def __init__(self, controller=RemoteController, monitor=False,
59 enable_learning = True, # in case of RemoteController (Ryu), learning switch behavior can be turned off/on
60 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
61 dc_emulation_max_mem=512, # emulation max mem in MB
62 **kwargs):
63 """
64 Create an extended version of a Containernet network
65 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
66 :param kwargs: path through for Mininet parameters
67 :return:
68 """
69 self.dcs = {}
70
71 # call original Docker.__init__ and setup default controller
72 Containernet.__init__(
73 self, switch=OVSKernelSwitch, controller=controller, **kwargs)
74
75
76 # Ryu management
77 self.ryu_process = None
78 if controller == RemoteController:
79 # start Ryu controller
80 self.startRyu(learning_switch=enable_learning)
81
82 # add the specified controller
83 self.addController('c0', controller=controller)
84
85 # graph of the complete DC network
86 self.DCNetwork_graph = nx.MultiDiGraph()
87
88 # initialize pool of vlan tags to setup the SDN paths
89 self.vlans = range(4096)[::-1]
90
91 # link to Ryu REST_API
92 ryu_ip = '0.0.0.0'
93 ryu_port = '8080'
94 self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)
95
96 # monitoring agent
97 if monitor:
98 self.monitor_agent = DCNetworkMonitor(self)
99 else:
100 self.monitor_agent = None
101
102 # initialize resource model registrar
103 self.rm_registrar = ResourceModelRegistrar(
104 dc_emulation_max_cpu, dc_emulation_max_mem)
105
106 def addDatacenter(self, label, metadata={}, resource_log_path=None):
107 """
108 Create and add a logical cloud data center to the network.
109 """
110 if label in self.dcs:
111 raise Exception("Data center label already exists: %s" % label)
112 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
113 dc.net = self # set reference to network
114 self.dcs[label] = dc
115 dc.create() # finally create the data center in our Mininet instance
116 LOG.info("added data center: %s" % label)
117 return dc
118
119 def addLink(self, node1, node2, **params):
120 """
121 Able to handle Datacenter objects as link
122 end points.
123 """
124 assert node1 is not None
125 assert node2 is not None
126 LOG.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
127 # ensure type of node1
128 if isinstance( node1, basestring ):
129 if node1 in self.dcs:
130 node1 = self.dcs[node1].switch
131 if isinstance( node1, Datacenter ):
132 node1 = node1.switch
133 # ensure type of node2
134 if isinstance( node2, basestring ):
135 if node2 in self.dcs:
136 node2 = self.dcs[node2].switch
137 if isinstance( node2, Datacenter ):
138 node2 = node2.switch
139 # try to give containers a default IP
140 if isinstance( node1, Docker ):
141 if "params1" not in params:
142 params["params1"] = {}
143 if "ip" not in params["params1"]:
144 params["params1"]["ip"] = self.getNextIp()
145 if isinstance( node2, Docker ):
146 if "params2" not in params:
147 params["params2"] = {}
148 if "ip" not in params["params2"]:
149 params["params2"]["ip"] = self.getNextIp()
150 # ensure that we allow TCLinks between data centers
151 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
152 # see Containernet issue: https://github.com/mpeuster/containernet/issues/3
153 if "cls" not in params:
154 params["cls"] = TCLink
155
156 link = Containernet.addLink(self, node1, node2, **params)
157
158 # try to give container interfaces a default id
159 node1_port_id = node1.ports[link.intf1]
160 if isinstance(node1, Docker):
161 if "id" in params["params1"]:
162 node1_port_id = params["params1"]["id"]
163 node1_port_name = link.intf1.name
164
165 node2_port_id = node2.ports[link.intf2]
166 if isinstance(node2, Docker):
167 if "id" in params["params2"]:
168 node2_port_id = params["params2"]["id"]
169 node2_port_name = link.intf2.name
170
171
172 # add edge and assigned port number to graph in both directions between node1 and node2
173 # port_id: id given in descriptor (if available, otherwise same as port)
174 # port: portnumber assigned by Containernet
175
176 attr_dict = {}
177 # possible weight metrics allowed by TClink class:
178 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
179 edge_attributes = [p for p in params if p in weight_metrics]
180 for attr in edge_attributes:
181 # if delay: strip ms (need number as weight in graph)
182 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
183 if match:
184 attr_number = match.group(1)
185 else:
186 attr_number = None
187 attr_dict[attr] = attr_number
188
189
190 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
191 'src_port_name': node1_port_name,
192 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
193 'dst_port_name': node2_port_name}
194 attr_dict2.update(attr_dict)
195 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
196
197 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
198 'src_port_name': node2_port_name,
199 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
200 'dst_port_name': node1_port_name}
201 attr_dict2.update(attr_dict)
202 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
203
204 return link
205
206 def addDocker( self, label, **params ):
207 """
208 Wrapper for addDocker method to use custom container class.
209 """
210 self.DCNetwork_graph.add_node(label)
211 return Containernet.addDocker(self, label, cls=EmulatorCompute, **params)
212
213 def removeDocker( self, label, **params ):
214 """
215 Wrapper for removeDocker method to update graph.
216 """
217 self.DCNetwork_graph.remove_node(label)
218 return Containernet.removeDocker(self, label, **params)
219
220 def addSwitch( self, name, add_to_graph=True, **params ):
221 """
222 Wrapper for addSwitch method to store switch also in graph.
223 """
224 if add_to_graph:
225 self.DCNetwork_graph.add_node(name)
226 return Containernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
227
228 def getAllContainers(self):
229 """
230 Returns a list with all containers within all data centers.
231 """
232 all_containers = []
233 for dc in self.dcs.itervalues():
234 all_containers += dc.listCompute()
235 return all_containers
236
237 def start(self):
238 # start
239 for dc in self.dcs.itervalues():
240 dc.start()
241 Containernet.start(self)
242
243 def stop(self):
244
245 # stop the monitor agent
246 if self.monitor_agent is not None:
247 self.monitor_agent.stop()
248
249 # stop emulator net
250 Containernet.stop(self)
251
252 # stop Ryu controller
253 self.stopRyu()
254
255
256 def CLI(self):
257 CLI(self)
258
259 # to remove chain do setChain( src, dst, cmd='del-flows')
260 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
261 cmd = kwargs.get('cmd')
262 if cmd == 'add-flow':
263 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
264 if kwargs.get('bidirectional'):
265 ret = ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
266
267 elif cmd == 'del-flows':
268 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
269 if kwargs.get('bidirectional'):
270 ret = ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
271
272 else:
273 ret = "Command unknown"
274
275 return ret
276
277
278 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
279
280 #check if port is specified (vnf:port)
281 if vnf_src_interface is None:
282 # take first interface by default
283 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
284 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
285 vnf_src_interface = link_dict[0]['src_port_id']
286
287 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
288 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
289 for link in link_dict:
290 if link_dict[link]['src_port_id'] == vnf_src_interface:
291 # found the right link and connected switch
292 src_sw = connected_sw
293
294 src_sw_inport_nr = link_dict[link]['dst_port_nr']
295 break
296
297 if vnf_dst_interface is None:
298 # take first interface by default
299 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
300 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
301 vnf_dst_interface = link_dict[0]['dst_port_id']
302
303 vnf_dst_name = vnf_dst_name.split(':')[0]
304 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
305 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
306 for link in link_dict:
307 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
308 # found the right link and connected switch
309 dst_sw = connected_sw
310 dst_sw_outport_nr = link_dict[link]['src_port_nr']
311 break
312
313
314 # get shortest path
315 try:
316 # returns the first found shortest path
317 # if all shortest paths are wanted, use: all_shortest_paths
318 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
319 except:
320 LOG.exception("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
321 LOG.debug("Graph nodes: %r" % self.DCNetwork_graph.nodes())
322 LOG.debug("Graph edges: %r" % self.DCNetwork_graph.edges())
323 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
324
325 LOG.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
326
327 current_hop = src_sw
328 switch_inport_nr = src_sw_inport_nr
329
330 # choose free vlan if path contains more than 1 switch
331 cmd = kwargs.get('cmd')
332 vlan = None
333 if cmd == 'add-flow':
334 if len(path) > 1:
335 vlan = self.vlans.pop()
336
337 for i in range(0,len(path)):
338 current_node = self.getNodeByName(current_hop)
339
340 if path.index(current_hop) < len(path)-1:
341 next_hop = path[path.index(current_hop)+1]
342 else:
343 #last switch reached
344 next_hop = vnf_dst_name
345
346 next_node = self.getNodeByName(next_hop)
347
348 if next_hop == vnf_dst_name:
349 switch_outport_nr = dst_sw_outport_nr
350 LOG.info("end node reached: {0}".format(vnf_dst_name))
351 elif not isinstance( next_node, OVSSwitch ):
352 LOG.info("Next node: {0} is not a switch".format(next_hop))
353 return "Next node: {0} is not a switch".format(next_hop)
354 else:
355 # take first link between switches by default
356 index_edge_out = 0
357 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
358
359
360 # set of entry via ovs-ofctl
361 if isinstance( current_node, OVSSwitch ):
362 kwargs['vlan'] = vlan
363 kwargs['path'] = path
364 kwargs['current_hop'] = current_hop
365
366 if self.controller == RemoteController:
367 ## set flow entry via ryu rest api
368 self._set_flow_entry_ryu_rest(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
369 else:
370 ## set flow entry via ovs-ofctl
371 self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
372
373
374
375 # take first link between switches by default
376 if isinstance( next_node, OVSSwitch ):
377 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
378 current_hop = next_hop
379
380 return "path {2} between {0} and {1}".format(vnf_src_name, vnf_dst_name, cmd)
381
382 def _set_flow_entry_ryu_rest(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
383 match = 'in_port=%s' % switch_inport_nr
384
385 cookie = kwargs.get('cookie')
386 match_input = kwargs.get('match')
387 cmd = kwargs.get('cmd')
388 path = kwargs.get('path')
389 current_hop = kwargs.get('current_hop')
390 vlan = kwargs.get('vlan')
391
392 s = ','
393 if match_input:
394 match = s.join([match, match_input])
395
396 flow = {}
397 flow['dpid'] = int(node.dpid, 16)
398
399 if cookie:
400 flow['cookie'] = int(cookie)
401
402
403 flow['actions'] = []
404
405 # possible Ryu actions, match fields:
406 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
407 if cmd == 'add-flow':
408 prefix = 'stats/flowentry/add'
409 if vlan != None:
410 if path.index(current_hop) == 0: # first node
411 action = {}
412 action['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
413 action['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
414 flow['actions'].append(action)
415 action = {}
416 action['type'] = 'SET_FIELD'
417 action['field'] = 'vlan_vid'
418 action['value'] = vlan
419 flow['actions'].append(action)
420 elif path.index(current_hop) == len(path) - 1: # last node
421 match += ',dl_vlan=%s' % vlan
422 action = {}
423 action['type'] = 'POP_VLAN'
424 flow['actions'].append(action)
425 else: # middle nodes
426 match += ',dl_vlan=%s' % vlan
427 # output action must come last
428 action = {}
429 action['type'] = 'OUTPUT'
430 action['port'] = switch_outport_nr
431 flow['actions'].append(action)
432
433 elif cmd == 'del-flows':
434 prefix = 'stats/flowentry/delete'
435
436 if cookie:
437 # TODO: add cookie_mask as argument
438 flow['cookie_mask'] = int('0xffffffffffffffff', 16) # need full mask to match complete cookie
439
440 action = {}
441 action['type'] = 'OUTPUT'
442 action['port'] = switch_outport_nr
443 flow['actions'].append(action)
444
445 flow['match'] = self._parse_match(match)
446 self.ryu_REST(prefix, data=flow)
447
448 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
449 match = 'in_port=%s' % switch_inport_nr
450
451 cookie = kwargs.get('cookie')
452 match_input = kwargs.get('match')
453 cmd = kwargs.get('cmd')
454 path = kwargs.get('path')
455 current_hop = kwargs.get('current_hop')
456 vlan = kwargs.get('vlan')
457
458 s = ','
459 if cookie:
460 cookie = 'cookie=%s' % cookie
461 match = s.join([cookie, match])
462 if match_input:
463 match = s.join([match, match_input])
464 if cmd == 'add-flow':
465 action = 'action=%s' % switch_outport_nr
466 if vlan != None:
467 if path.index(current_hop) == 0: # first node
468 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
469 match = '-O OpenFlow13 ' + match
470 elif path.index(current_hop) == len(path) - 1: # last node
471 match += ',dl_vlan=%s' % vlan
472 action = 'action=strip_vlan,output=%s' % switch_outport_nr
473 else: # middle nodes
474 match += ',dl_vlan=%s' % vlan
475 ofcmd = s.join([match, action])
476 elif cmd == 'del-flows':
477 ofcmd = match
478 else:
479 ofcmd = ''
480
481 node.dpctl(cmd, ofcmd)
482 LOG.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
483 switch_outport_nr, cmd))
484
485 # start Ryu Openflow controller as Remote Controller for the DCNetwork
486 def startRyu(self, learning_switch=True):
487 # start Ryu controller with rest-API
488 python_install_path = site.getsitepackages()[0]
489 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
490 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
491 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
492 # Ryu still uses 6633 as default
493 ryu_option = '--ofp-tcp-listen-port'
494 ryu_of_port = '6653'
495 ryu_cmd = 'ryu-manager'
496 FNULL = open("/tmp/ryu.log", 'w')
497 if learning_switch:
498 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
499 else:
500 # no learning switch, but with rest api
501 self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
502 time.sleep(1)
503
504 def stopRyu(self):
505 if self.ryu_process is not None:
506 self.ryu_process.terminate()
507 self.ryu_process.kill()
508
509 def ryu_REST(self, prefix, dpid=None, data=None):
510 try:
511 if dpid:
512 url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)
513 else:
514 url = self.ryu_REST_api + '/' + str(prefix)
515 if data:
516 #LOG.info('POST: {0}'.format(str(data)))
517 req = urllib2.Request(url, str(data))
518 else:
519 req = urllib2.Request(url)
520
521 ret = urllib2.urlopen(req).read()
522 return ret
523 except:
524 LOG.info('error url: {0}'.format(str(url)))
525 if data: LOG.info('error POST: {0}'.format(str(data)))
526
527 # need to respect that some match fields must be integers
528 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
529 def _parse_match(self, match):
530 matches = match.split(',')
531 dict = {}
532 for m in matches:
533 match = m.split('=')
534 if len(match) == 2:
535 try:
536 m2 = int(match[1], 0)
537 except:
538 m2 = match[1]
539
540 dict.update({match[0]:m2})
541 return dict
542