Fix: Fixed crash-bug when setChain gets interface names instead of nummeric ports...
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Copyright (c) 2015 SONATA-NFV and Paderborn University
3 ALL RIGHTS RESERVED.
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
20 permission.
21
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
27 """
28 import logging
29
30 import site
31 import time
32 from subprocess import Popen
33 import os
34 import re
35 import urllib2
36 from functools import partial
37
38 from mininet.net import Containernet
39 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
40 from mininet.cli import CLI
41 from mininet.link import TCLink
42 import networkx as nx
43 from emuvim.dcemulator.monitoring import DCNetworkMonitor
44 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
45 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
46
47 LOG = logging.getLogger("dcemulator.net")
48 LOG.setLevel(logging.DEBUG)
49
50 class DCNetwork(Containernet):
51 """
52 Wraps the original Mininet/Containernet class and provides
53 methods to add data centers, switches, etc.
54
55 This class is used by topology definition scripts.
56 """
57
58 def __init__(self, controller=RemoteController, monitor=False,
59 enable_learning = True, # in case of RemoteController (Ryu), learning switch behavior can be turned off/on
60 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
61 dc_emulation_max_mem=512, # emulation max mem in MB
62 **kwargs):
63 """
64 Create an extended version of a Containernet network
65 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
66 :param kwargs: path through for Mininet parameters
67 :return:
68 """
69 self.dcs = {}
70
71 # call original Docker.__init__ and setup default controller
72 Containernet.__init__(
73 self, switch=OVSKernelSwitch, controller=controller, **kwargs)
74
75
76 # Ryu management
77 self.ryu_process = None
78 if controller == RemoteController:
79 # start Ryu controller
80 self.startRyu(learning_switch=enable_learning)
81
82 # add the specified controller
83 self.addController('c0', controller=controller)
84
85 # graph of the complete DC network
86 self.DCNetwork_graph = nx.MultiDiGraph()
87
88 # initialize pool of vlan tags to setup the SDN paths
89 self.vlans = range(4096)[::-1]
90
91 # link to Ryu REST_API
92 ryu_ip = '0.0.0.0'
93 ryu_port = '8080'
94 self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)
95
96 # monitoring agent
97 if monitor:
98 self.monitor_agent = DCNetworkMonitor(self)
99 else:
100 self.monitor_agent = None
101
102 # initialize resource model registrar
103 self.rm_registrar = ResourceModelRegistrar(
104 dc_emulation_max_cpu, dc_emulation_max_mem)
105
106 def addDatacenter(self, label, metadata={}, resource_log_path=None):
107 """
108 Create and add a logical cloud data center to the network.
109 """
110 if label in self.dcs:
111 raise Exception("Data center label already exists: %s" % label)
112 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
113 dc.net = self # set reference to network
114 self.dcs[label] = dc
115 dc.create() # finally create the data center in our Mininet instance
116 LOG.info("added data center: %s" % label)
117 return dc
118
119 def addLink(self, node1, node2, **params):
120 """
121 Able to handle Datacenter objects as link
122 end points.
123 """
124 assert node1 is not None
125 assert node2 is not None
126 LOG.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
127 # ensure type of node1
128 if isinstance( node1, basestring ):
129 if node1 in self.dcs:
130 node1 = self.dcs[node1].switch
131 if isinstance( node1, Datacenter ):
132 node1 = node1.switch
133 # ensure type of node2
134 if isinstance( node2, basestring ):
135 if node2 in self.dcs:
136 node2 = self.dcs[node2].switch
137 if isinstance( node2, Datacenter ):
138 node2 = node2.switch
139 # try to give containers a default IP
140 if isinstance( node1, Docker ):
141 if "params1" not in params:
142 params["params1"] = {}
143 if "ip" not in params["params1"]:
144 params["params1"]["ip"] = self.getNextIp()
145 if isinstance( node2, Docker ):
146 if "params2" not in params:
147 params["params2"] = {}
148 if "ip" not in params["params2"]:
149 params["params2"]["ip"] = self.getNextIp()
150 # ensure that we allow TCLinks between data centers
151 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
152 # see Containernet issue: https://github.com/mpeuster/containernet/issues/3
153 if "cls" not in params:
154 params["cls"] = TCLink
155
156 link = Containernet.addLink(self, node1, node2, **params)
157
158 # try to give container interfaces a default id
159 node1_port_id = node1.ports[link.intf1]
160 if isinstance(node1, Docker):
161 if "id" in params["params1"]:
162 node1_port_id = params["params1"]["id"]
163 node1_port_name = link.intf1.name
164
165 node2_port_id = node2.ports[link.intf2]
166 if isinstance(node2, Docker):
167 if "id" in params["params2"]:
168 node2_port_id = params["params2"]["id"]
169 node2_port_name = link.intf2.name
170
171
172 # add edge and assigned port number to graph in both directions between node1 and node2
173 # port_id: id given in descriptor (if available, otherwise same as port)
174 # port: portnumber assigned by Containernet
175
176 attr_dict = {}
177 # possible weight metrics allowed by TClink class:
178 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
179 edge_attributes = [p for p in params if p in weight_metrics]
180 for attr in edge_attributes:
181 # if delay: strip ms (need number as weight in graph)
182 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
183 if match:
184 attr_number = match.group(1)
185 else:
186 attr_number = None
187 attr_dict[attr] = attr_number
188
189
190 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
191 'src_port_name': node1_port_name,
192 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
193 'dst_port_name': node2_port_name}
194 attr_dict2.update(attr_dict)
195 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
196
197 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
198 'src_port_name': node2_port_name,
199 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
200 'dst_port_name': node1_port_name}
201 attr_dict2.update(attr_dict)
202 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
203
204 return link
205
206 def addDocker( self, label, **params ):
207 """
208 Wrapper for addDocker method to use custom container class.
209 """
210 self.DCNetwork_graph.add_node(label)
211 return Containernet.addDocker(self, label, cls=EmulatorCompute, **params)
212
213 def removeDocker( self, label, **params ):
214 """
215 Wrapper for removeDocker method to update graph.
216 """
217 self.DCNetwork_graph.remove_node(label)
218 return Containernet.removeDocker(self, label, **params)
219
220 def addSwitch( self, name, add_to_graph=True, **params ):
221 """
222 Wrapper for addSwitch method to store switch also in graph.
223 """
224 if add_to_graph:
225 self.DCNetwork_graph.add_node(name)
226 return Containernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
227
228 def getAllContainers(self):
229 """
230 Returns a list with all containers within all data centers.
231 """
232 all_containers = []
233 for dc in self.dcs.itervalues():
234 all_containers += dc.listCompute()
235 return all_containers
236
237 def start(self):
238 # start
239 for dc in self.dcs.itervalues():
240 dc.start()
241 Containernet.start(self)
242
243 def stop(self):
244
245 # stop the monitor agent
246 if self.monitor_agent is not None:
247 self.monitor_agent.stop()
248
249 # stop emulator net
250 Containernet.stop(self)
251
252 # stop Ryu controller
253 self.stopRyu()
254
255
256 def CLI(self):
257 CLI(self)
258
259 # to remove chain do setChain( src, dst, cmd='del-flows')
260 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
261 cmd = kwargs.get('cmd')
262 if cmd == 'add-flow':
263 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
264 if kwargs.get('bidirectional'):
265 ret = ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
266
267 elif cmd == 'del-flows':
268 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
269 if kwargs.get('bidirectional'):
270 ret = ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
271
272 else:
273 ret = "Command unknown"
274
275 return ret
276
277
278 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
279
280 src_sw = None
281 dst_sw = None
282 src_sw_inport_nr = 0
283 dst_sw_outport_nr = 0
284
285 LOG.debug("call chainAddFlow vnf_src_name=%r, vnf_src_interface=%r, vnf_dst_name=%r, vnf_dst_interface=%r",
286 vnf_src_name, vnf_src_interface, vnf_dst_name, vnf_dst_interface)
287
288 #check if port is specified (vnf:port)
289 if vnf_src_interface is None:
290 # take first interface by default
291 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
292 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
293 vnf_src_interface = link_dict[0]['src_port_id']
294
295 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
296 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
297 for link in link_dict:
298 if (link_dict[link]['src_port_id'] == vnf_src_interface or
299 link_dict[link]['src_port_name'] == vnf_src_interface): # Fix: we might also get interface names, e.g, from a son-emu-cli call
300 # found the right link and connected switch
301 src_sw = connected_sw
302 src_sw_inport_nr = link_dict[link]['dst_port_nr']
303 break
304
305 if vnf_dst_interface is None:
306 # take first interface by default
307 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
308 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
309 vnf_dst_interface = link_dict[0]['dst_port_id']
310
311 vnf_dst_name = vnf_dst_name.split(':')[0]
312 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
313 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
314 for link in link_dict:
315 if link_dict[link]['dst_port_id'] == vnf_dst_interface or \
316 link_dict[link]['dst_port_name'] == vnf_dst_interface: # Fix: we might also get interface names, e.g, from a son-emu-cli call
317 # found the right link and connected switch
318 dst_sw = connected_sw
319 dst_sw_outport_nr = link_dict[link]['src_port_nr']
320 break
321
322
323 # get shortest path
324 try:
325 # returns the first found shortest path
326 # if all shortest paths are wanted, use: all_shortest_paths
327 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
328 except:
329 LOG.exception("No path could be found between {0} and {1} using src_sw={2} and dst_sw={3}".format(
330 vnf_src_name, vnf_dst_name, src_sw, dst_sw))
331 LOG.debug("Graph nodes: %r" % self.DCNetwork_graph.nodes())
332 LOG.debug("Graph edges: %r" % self.DCNetwork_graph.edges())
333 for e, v in self.DCNetwork_graph.edges():
334 LOG.debug("%r" % self.DCNetwork_graph[e][v])
335 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
336
337 LOG.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
338
339 current_hop = src_sw
340 switch_inport_nr = src_sw_inport_nr
341
342 # choose free vlan if path contains more than 1 switch
343 cmd = kwargs.get('cmd')
344 vlan = None
345 if cmd == 'add-flow':
346 if len(path) > 1:
347 vlan = self.vlans.pop()
348
349 for i in range(0,len(path)):
350 current_node = self.getNodeByName(current_hop)
351
352 if path.index(current_hop) < len(path)-1:
353 next_hop = path[path.index(current_hop)+1]
354 else:
355 #last switch reached
356 next_hop = vnf_dst_name
357
358 next_node = self.getNodeByName(next_hop)
359
360 if next_hop == vnf_dst_name:
361 switch_outport_nr = dst_sw_outport_nr
362 LOG.info("end node reached: {0}".format(vnf_dst_name))
363 elif not isinstance( next_node, OVSSwitch ):
364 LOG.info("Next node: {0} is not a switch".format(next_hop))
365 return "Next node: {0} is not a switch".format(next_hop)
366 else:
367 # take first link between switches by default
368 index_edge_out = 0
369 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
370
371
372 # set of entry via ovs-ofctl
373 if isinstance( current_node, OVSSwitch ):
374 kwargs['vlan'] = vlan
375 kwargs['path'] = path
376 kwargs['current_hop'] = current_hop
377
378 if self.controller == RemoteController:
379 ## set flow entry via ryu rest api
380 self._set_flow_entry_ryu_rest(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
381 else:
382 ## set flow entry via ovs-ofctl
383 self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
384
385
386
387 # take first link between switches by default
388 if isinstance( next_node, OVSSwitch ):
389 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
390 current_hop = next_hop
391
392 return "path {2} between {0} and {1}".format(vnf_src_name, vnf_dst_name, cmd)
393
394 def _set_flow_entry_ryu_rest(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
395 match = 'in_port=%s' % switch_inport_nr
396
397 cookie = kwargs.get('cookie')
398 match_input = kwargs.get('match')
399 cmd = kwargs.get('cmd')
400 path = kwargs.get('path')
401 current_hop = kwargs.get('current_hop')
402 vlan = kwargs.get('vlan')
403
404 s = ','
405 if match_input:
406 match = s.join([match, match_input])
407
408 flow = {}
409 flow['dpid'] = int(node.dpid, 16)
410
411 if cookie:
412 flow['cookie'] = int(cookie)
413
414
415 flow['actions'] = []
416
417 # possible Ryu actions, match fields:
418 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
419 if cmd == 'add-flow':
420 prefix = 'stats/flowentry/add'
421 if vlan != None:
422 if path.index(current_hop) == 0: # first node
423 action = {}
424 action['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
425 action['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
426 flow['actions'].append(action)
427 action = {}
428 action['type'] = 'SET_FIELD'
429 action['field'] = 'vlan_vid'
430 action['value'] = vlan
431 flow['actions'].append(action)
432 elif path.index(current_hop) == len(path) - 1: # last node
433 match += ',dl_vlan=%s' % vlan
434 action = {}
435 action['type'] = 'POP_VLAN'
436 flow['actions'].append(action)
437 else: # middle nodes
438 match += ',dl_vlan=%s' % vlan
439 # output action must come last
440 action = {}
441 action['type'] = 'OUTPUT'
442 action['port'] = switch_outport_nr
443 flow['actions'].append(action)
444
445 elif cmd == 'del-flows':
446 prefix = 'stats/flowentry/delete'
447
448 if cookie:
449 # TODO: add cookie_mask as argument
450 flow['cookie_mask'] = int('0xffffffffffffffff', 16) # need full mask to match complete cookie
451
452 action = {}
453 action['type'] = 'OUTPUT'
454 action['port'] = switch_outport_nr
455 flow['actions'].append(action)
456
457 flow['match'] = self._parse_match(match)
458 self.ryu_REST(prefix, data=flow)
459
460 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
461 match = 'in_port=%s' % switch_inport_nr
462
463 cookie = kwargs.get('cookie')
464 match_input = kwargs.get('match')
465 cmd = kwargs.get('cmd')
466 path = kwargs.get('path')
467 current_hop = kwargs.get('current_hop')
468 vlan = kwargs.get('vlan')
469
470 s = ','
471 if cookie:
472 cookie = 'cookie=%s' % cookie
473 match = s.join([cookie, match])
474 if match_input:
475 match = s.join([match, match_input])
476 if cmd == 'add-flow':
477 action = 'action=%s' % switch_outport_nr
478 if vlan != None:
479 if path.index(current_hop) == 0: # first node
480 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
481 match = '-O OpenFlow13 ' + match
482 elif path.index(current_hop) == len(path) - 1: # last node
483 match += ',dl_vlan=%s' % vlan
484 action = 'action=strip_vlan,output=%s' % switch_outport_nr
485 else: # middle nodes
486 match += ',dl_vlan=%s' % vlan
487 ofcmd = s.join([match, action])
488 elif cmd == 'del-flows':
489 ofcmd = match
490 else:
491 ofcmd = ''
492
493 node.dpctl(cmd, ofcmd)
494 LOG.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
495 switch_outport_nr, cmd))
496
497 # start Ryu Openflow controller as Remote Controller for the DCNetwork
498 def startRyu(self, learning_switch=True):
499 # start Ryu controller with rest-API
500 python_install_path = site.getsitepackages()[0]
501 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
502 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
503 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
504 # Ryu still uses 6633 as default
505 ryu_option = '--ofp-tcp-listen-port'
506 ryu_of_port = '6653'
507 ryu_cmd = 'ryu-manager'
508 FNULL = open("/tmp/ryu.log", 'w')
509 if learning_switch:
510 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
511 else:
512 # no learning switch, but with rest api
513 self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
514 time.sleep(1)
515
516 def stopRyu(self):
517 if self.ryu_process is not None:
518 self.ryu_process.terminate()
519 self.ryu_process.kill()
520
521 def ryu_REST(self, prefix, dpid=None, data=None):
522 try:
523 if dpid:
524 url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)
525 else:
526 url = self.ryu_REST_api + '/' + str(prefix)
527 if data:
528 #LOG.info('POST: {0}'.format(str(data)))
529 req = urllib2.Request(url, str(data))
530 else:
531 req = urllib2.Request(url)
532
533 ret = urllib2.urlopen(req).read()
534 return ret
535 except:
536 LOG.info('error url: {0}'.format(str(url)))
537 if data: LOG.info('error POST: {0}'.format(str(data)))
538
539 # need to respect that some match fields must be integers
540 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
541 def _parse_match(self, match):
542 matches = match.split(',')
543 dict = {}
544 for m in matches:
545 match = m.split('=')
546 if len(match) == 2:
547 try:
548 m2 = int(match[1], 0)
549 except:
550 m2 = match[1]
551
552 dict.update({match[0]:m2})
553 return dict
554