allow custom paths for chaining
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Copyright (c) 2015 SONATA-NFV and Paderborn University
3 ALL RIGHTS RESERVED.
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
20 permission.
21
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
27 """
28 import logging
29
30 import site
31 import time
32 from subprocess import Popen
33 import re
34 import requests
35
36 from mininet.net import Containernet
37 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
38 from mininet.cli import CLI
39 from mininet.link import TCLink
40 from mininet.clean import cleanup
41 import networkx as nx
42 from emuvim.dcemulator.monitoring import DCNetworkMonitor
43 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
44 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
45
46 LOG = logging.getLogger("dcemulator.net")
47 LOG.setLevel(logging.DEBUG)
48
49 class DCNetwork(Containernet):
50 """
51 Wraps the original Mininet/Containernet class and provides
52 methods to add data centers, switches, etc.
53
54 This class is used by topology definition scripts.
55 """
56
57 def __init__(self, controller=RemoteController, monitor=False,
58 enable_learning = True, # in case of RemoteController (Ryu), learning switch behavior can be turned off/on
59 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
60 dc_emulation_max_mem=512, # emulation max mem in MB
61 **kwargs):
62 """
63 Create an extended version of a Containernet network
64 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
65 :param kwargs: path through for Mininet parameters
66 :return:
67 """
68 # members
69 self.dcs = {}
70 self.ryu_process = None
71
72 # always cleanup environment before we start the emulator
73 self.killRyu()
74 cleanup()
75
76 # call original Docker.__init__ and setup default controller
77 Containernet.__init__(
78 self, switch=OVSKernelSwitch, controller=controller, **kwargs)
79
80 # Ryu management
81 if controller == RemoteController:
82 # start Ryu controller
83 self.startRyu(learning_switch=enable_learning)
84
85 # add the specified controller
86 self.addController('c0', controller=controller)
87
88 # graph of the complete DC network
89 self.DCNetwork_graph = nx.MultiDiGraph()
90
91 # initialize pool of vlan tags to setup the SDN paths
92 self.vlans = range(4096)[::-1]
93
94 # link to Ryu REST_API
95 ryu_ip = 'localhost'
96 ryu_port = '8080'
97 self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)
98 self.RyuSession = requests.Session()
99
100 # monitoring agent
101 if monitor:
102 self.monitor_agent = DCNetworkMonitor(self)
103 else:
104 self.monitor_agent = None
105
106 # initialize resource model registrar
107 self.rm_registrar = ResourceModelRegistrar(
108 dc_emulation_max_cpu, dc_emulation_max_mem)
109
110 def addDatacenter(self, label, metadata={}, resource_log_path=None):
111 """
112 Create and add a logical cloud data center to the network.
113 """
114 if label in self.dcs:
115 raise Exception("Data center label already exists: %s" % label)
116 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
117 dc.net = self # set reference to network
118 self.dcs[label] = dc
119 dc.create() # finally create the data center in our Mininet instance
120 LOG.info("added data center: %s" % label)
121 return dc
122
123 def addLink(self, node1, node2, **params):
124 """
125 Able to handle Datacenter objects as link
126 end points.
127 """
128 assert node1 is not None
129 assert node2 is not None
130 LOG.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
131 # ensure type of node1
132 if isinstance( node1, basestring ):
133 if node1 in self.dcs:
134 node1 = self.dcs[node1].switch
135 if isinstance( node1, Datacenter ):
136 node1 = node1.switch
137 # ensure type of node2
138 if isinstance( node2, basestring ):
139 if node2 in self.dcs:
140 node2 = self.dcs[node2].switch
141 if isinstance( node2, Datacenter ):
142 node2 = node2.switch
143 # try to give containers a default IP
144 if isinstance( node1, Docker ):
145 if "params1" not in params:
146 params["params1"] = {}
147 if "ip" not in params["params1"]:
148 params["params1"]["ip"] = self.getNextIp()
149 if isinstance( node2, Docker ):
150 if "params2" not in params:
151 params["params2"] = {}
152 if "ip" not in params["params2"]:
153 params["params2"]["ip"] = self.getNextIp()
154 # ensure that we allow TCLinks between data centers
155 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
156 # see Containernet issue: https://github.com/mpeuster/containernet/issues/3
157 if "cls" not in params:
158 params["cls"] = TCLink
159
160 link = Containernet.addLink(self, node1, node2, **params)
161
162 # try to give container interfaces a default id
163 node1_port_id = node1.ports[link.intf1]
164 if isinstance(node1, Docker):
165 if "id" in params["params1"]:
166 node1_port_id = params["params1"]["id"]
167 node1_port_name = link.intf1.name
168
169 node2_port_id = node2.ports[link.intf2]
170 if isinstance(node2, Docker):
171 if "id" in params["params2"]:
172 node2_port_id = params["params2"]["id"]
173 node2_port_name = link.intf2.name
174
175
176 # add edge and assigned port number to graph in both directions between node1 and node2
177 # port_id: id given in descriptor (if available, otherwise same as port)
178 # port: portnumber assigned by Containernet
179
180 attr_dict = {}
181 # possible weight metrics allowed by TClink class:
182 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
183 edge_attributes = [p for p in params if p in weight_metrics]
184 for attr in edge_attributes:
185 # if delay: strip ms (need number as weight in graph)
186 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
187 if match:
188 attr_number = match.group(1)
189 else:
190 attr_number = None
191 attr_dict[attr] = attr_number
192
193
194 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
195 'src_port_name': node1_port_name,
196 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
197 'dst_port_name': node2_port_name}
198 attr_dict2.update(attr_dict)
199 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
200
201 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
202 'src_port_name': node2_port_name,
203 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
204 'dst_port_name': node1_port_name}
205 attr_dict2.update(attr_dict)
206 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
207
208 return link
209
210 def addDocker( self, label, **params ):
211 """
212 Wrapper for addDocker method to use custom container class.
213 """
214 self.DCNetwork_graph.add_node(label)
215 return Containernet.addDocker(self, label, cls=EmulatorCompute, **params)
216
217 def removeDocker( self, label, **params ):
218 """
219 Wrapper for removeDocker method to update graph.
220 """
221 self.DCNetwork_graph.remove_node(label)
222 return Containernet.removeDocker(self, label, **params)
223
224 def addSwitch( self, name, add_to_graph=True, **params ):
225 """
226 Wrapper for addSwitch method to store switch also in graph.
227 """
228 if add_to_graph:
229 self.DCNetwork_graph.add_node(name)
230 return Containernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
231
232 def getAllContainers(self):
233 """
234 Returns a list with all containers within all data centers.
235 """
236 all_containers = []
237 for dc in self.dcs.itervalues():
238 all_containers += dc.listCompute()
239 return all_containers
240
241 def start(self):
242 # start
243 for dc in self.dcs.itervalues():
244 dc.start()
245 Containernet.start(self)
246
247 def stop(self):
248
249 # stop the monitor agent
250 if self.monitor_agent is not None:
251 self.monitor_agent.stop()
252
253 # stop emulator net
254 Containernet.stop(self)
255
256 # stop Ryu controller
257 self.killRyu()
258
259
260 def CLI(self):
261 CLI(self)
262
263 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
264 """
265 Chain 2 vnf interfaces together by installing the flowrules in the switches along their path.
266 Currently the path is found using the default networkx shortest path function.
267 Each chain gets a unique vlan id , so different chains wil not interfere.
268
269 :param vnf_src_name: vnf name (string)
270 :param vnf_dst_name: vnf name (string)
271 :param vnf_src_interface: source interface name (string)
272 :param vnf_dst_interface: destination interface name (string)
273 :param cmd: 'add-flow' (default) to add a chain, 'del-flows' to remove a chain
274 :param cookie: cookie for the installed flowrules (can be used later as identifier for a set of installed chains)
275 :param match: custom match entry to be added to the flowrules (default: only in_port and vlan tag)
276 :param priority: custom flowrule priority
277 :param path: custom path between the two VNFs (list of switches)
278 :return: output log string
279 """
280 cmd = kwargs.get('cmd')
281 if cmd == 'add-flow':
282 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
283 if kwargs.get('bidirectional'):
284 if kwargs.get('path') is not None:
285 kwargs['path'] = list(reversed(kwargs.get('path')))
286 ret = ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
287
288 elif cmd == 'del-flows':
289 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
290 if kwargs.get('bidirectional'):
291 if kwargs.get('path') is not None:
292 kwargs['path'] = list(reversed(kwargs.get('path')))
293 ret = ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
294
295 else:
296 ret = "Command unknown"
297
298 return ret
299
300
301 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
302
303 src_sw = None
304 dst_sw = None
305 src_sw_inport_nr = 0
306 dst_sw_outport_nr = 0
307
308 LOG.debug("call chainAddFlow vnf_src_name=%r, vnf_src_interface=%r, vnf_dst_name=%r, vnf_dst_interface=%r",
309 vnf_src_name, vnf_src_interface, vnf_dst_name, vnf_dst_interface)
310
311 #check if port is specified (vnf:port)
312 if vnf_src_interface is None:
313 # take first interface by default
314 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
315 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
316 vnf_src_interface = link_dict[0]['src_port_id']
317
318 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
319 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
320 for link in link_dict:
321 if (link_dict[link]['src_port_id'] == vnf_src_interface or
322 link_dict[link]['src_port_name'] == vnf_src_interface): # Fix: we might also get interface names, e.g, from a son-emu-cli call
323 # found the right link and connected switch
324 src_sw = connected_sw
325 src_sw_inport_nr = link_dict[link]['dst_port_nr']
326 break
327
328 if vnf_dst_interface is None:
329 # take first interface by default
330 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
331 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
332 vnf_dst_interface = link_dict[0]['dst_port_id']
333
334 vnf_dst_name = vnf_dst_name.split(':')[0]
335 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
336 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
337 for link in link_dict:
338 if link_dict[link]['dst_port_id'] == vnf_dst_interface or \
339 link_dict[link]['dst_port_name'] == vnf_dst_interface: # Fix: we might also get interface names, e.g, from a son-emu-cli call
340 # found the right link and connected switch
341 dst_sw = connected_sw
342 dst_sw_outport_nr = link_dict[link]['src_port_nr']
343 break
344
345 path = kwargs.get('path')
346 if path is None:
347 # get shortest path
348 try:
349 # returns the first found shortest path
350 # if all shortest paths are wanted, use: all_shortest_paths
351 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
352 except:
353 LOG.exception("No path could be found between {0} and {1} using src_sw={2} and dst_sw={3}".format(
354 vnf_src_name, vnf_dst_name, src_sw, dst_sw))
355 LOG.debug("Graph nodes: %r" % self.DCNetwork_graph.nodes())
356 LOG.debug("Graph edges: %r" % self.DCNetwork_graph.edges())
357 for e, v in self.DCNetwork_graph.edges():
358 LOG.debug("%r" % self.DCNetwork_graph[e][v])
359 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
360
361 LOG.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
362
363 current_hop = src_sw
364 switch_inport_nr = src_sw_inport_nr
365
366 # choose free vlan if path contains more than 1 switch
367 cmd = kwargs.get('cmd')
368 vlan = None
369 if cmd == 'add-flow':
370 if len(path) > 1:
371 vlan = self.vlans.pop()
372
373 for i in range(0,len(path)):
374 current_node = self.getNodeByName(current_hop)
375
376 if i < len(path) - 1:
377 next_hop = path[i + 1]
378 else:
379 # last switch reached
380 next_hop = vnf_dst_name
381
382 next_node = self.getNodeByName(next_hop)
383
384 if next_hop == vnf_dst_name:
385 switch_outport_nr = dst_sw_outport_nr
386 LOG.info("end node reached: {0}".format(vnf_dst_name))
387 elif not isinstance( next_node, OVSSwitch ):
388 LOG.info("Next node: {0} is not a switch".format(next_hop))
389 return "Next node: {0} is not a switch".format(next_hop)
390 else:
391 # take first link between switches by default
392 index_edge_out = 0
393 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
394
395
396 # set of entry via ovs-ofctl
397 if isinstance( current_node, OVSSwitch ):
398 kwargs['vlan'] = vlan
399 kwargs['path'] = path
400 kwargs['pathindex'] = i
401
402 if self.controller == RemoteController:
403 ## set flow entry via ryu rest api
404 self._set_flow_entry_ryu_rest(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
405 else:
406 ## set flow entry via ovs-ofctl
407 self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
408
409
410
411 # take first link between switches by default
412 if isinstance( next_node, OVSSwitch ):
413 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
414 current_hop = next_hop
415
416 return "path {2} between {0} and {1}".format(vnf_src_name, vnf_dst_name, cmd)
417
418 def _set_flow_entry_ryu_rest(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
419 match = 'in_port=%s' % switch_inport_nr
420
421 cookie = kwargs.get('cookie')
422 match_input = kwargs.get('match')
423 cmd = kwargs.get('cmd')
424 path = kwargs.get('path')
425 index = kwargs.get('pathindex')
426
427 vlan = kwargs.get('vlan')
428 priority = kwargs.get('priority')
429
430 s = ','
431 if match_input:
432 match = s.join([match, match_input])
433
434 flow = {}
435 flow['dpid'] = int(node.dpid, 16)
436
437 if cookie:
438 flow['cookie'] = int(cookie)
439 if priority:
440 flow['priority'] = int(priority)
441
442 flow['actions'] = []
443
444 # possible Ryu actions, match fields:
445 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
446 if cmd == 'add-flow':
447 prefix = 'stats/flowentry/add'
448 if vlan != None:
449 if index == 0: # first node
450 action = {}
451 action['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
452 action['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
453 flow['actions'].append(action)
454 action = {}
455 action['type'] = 'SET_FIELD'
456 action['field'] = 'vlan_vid'
457 # ryu expects the field to be masked
458 action['value'] = vlan | 0x1000
459 flow['actions'].append(action)
460 elif index == len(path) -1: # last node
461 match += ',dl_vlan=%s' % vlan
462 action = {}
463 action['type'] = 'POP_VLAN'
464 flow['actions'].append(action)
465 else: # middle nodes
466 match += ',dl_vlan=%s' % vlan
467 # output action must come last
468 action = {}
469 action['type'] = 'OUTPUT'
470 action['port'] = switch_outport_nr
471 flow['actions'].append(action)
472
473 elif cmd == 'del-flows':
474 prefix = 'stats/flowentry/delete'
475
476 if cookie:
477 # TODO: add cookie_mask as argument
478 flow['cookie_mask'] = int('0xffffffffffffffff', 16) # need full mask to match complete cookie
479
480 action = {}
481 action['type'] = 'OUTPUT'
482 action['port'] = switch_outport_nr
483 flow['actions'].append(action)
484
485 flow['match'] = self._parse_match(match)
486 self.ryu_REST(prefix, data=flow)
487
488 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
489 match = 'in_port=%s' % switch_inport_nr
490
491 cookie = kwargs.get('cookie')
492 match_input = kwargs.get('match')
493 cmd = kwargs.get('cmd')
494 path = kwargs.get('path')
495 index = kwargs.get('pathindex')
496 vlan = kwargs.get('vlan')
497
498 s = ','
499 if cookie:
500 cookie = 'cookie=%s' % cookie
501 match = s.join([cookie, match])
502 if match_input:
503 match = s.join([match, match_input])
504 if cmd == 'add-flow':
505 action = 'action=%s' % switch_outport_nr
506 if vlan != None:
507 if index == 0: # first node
508 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
509 match = '-O OpenFlow13 ' + match
510 elif index == len(path) - 1: # last node
511 match += ',dl_vlan=%s' % vlan
512 action = 'action=strip_vlan,output=%s' % switch_outport_nr
513 else: # middle nodes
514 match += ',dl_vlan=%s' % vlan
515 ofcmd = s.join([match, action])
516 elif cmd == 'del-flows':
517 ofcmd = match
518 else:
519 ofcmd = ''
520
521 node.dpctl(cmd, ofcmd)
522 LOG.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
523 switch_outport_nr, cmd))
524
525 # start Ryu Openflow controller as Remote Controller for the DCNetwork
526 def startRyu(self, learning_switch=True):
527 # start Ryu controller with rest-API
528 python_install_path = site.getsitepackages()[0]
529 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
530 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
531 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
532 # Ryu still uses 6633 as default
533 ryu_option = '--ofp-tcp-listen-port'
534 ryu_of_port = '6653'
535 ryu_cmd = 'ryu-manager'
536 FNULL = open("/tmp/ryu.log", 'w')
537 if learning_switch:
538 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
539 else:
540 # no learning switch, but with rest api
541 self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
542 time.sleep(1)
543
544 def killRyu(self):
545 """
546 Stop the Ryu controller that might be started by son-emu.
547 :return:
548 """
549 # try it nicely
550 if self.ryu_process is not None:
551 self.ryu_process.terminate()
552 self.ryu_process.kill()
553 # ensure its death ;-)
554 Popen(['pkill', '-f', 'ryu-manager'])
555
556 def ryu_REST(self, prefix, dpid=None, data=None):
557
558 if dpid:
559 url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)
560 else:
561 url = self.ryu_REST_api + '/' + str(prefix)
562 if data:
563 req = self.RyuSession.post(url, json=data)
564 else:
565 req = self.RyuSession.get(url)
566
567
568 # do extra logging if status code is not 200 (OK)
569 if req.status_code is not requests.codes.ok:
570 logging.info(
571 'type {0} encoding: {1} text: {2} headers: {3} history: {4}'.format(req.headers['content-type'],
572 req.encoding, req.text,
573 req.headers, req.history))
574 LOG.info('url: {0}'.format(str(url)))
575 if data: LOG.info('POST: {0}'.format(str(data)))
576 LOG.info('status: {0} reason: {1}'.format(req.status_code, req.reason))
577
578
579 if 'json' in req.headers['content-type']:
580 ret = req.json()
581 return ret
582
583 ret = req.text.rstrip()
584 return ret
585
586
587 # need to respect that some match fields must be integers
588 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
589 def _parse_match(self, match):
590 matches = match.split(',')
591 dict = {}
592 for m in matches:
593 match = m.split('=')
594 if len(match) == 2:
595 try:
596 m2 = int(match[1], 0)
597 except:
598 m2 = match[1]
599
600 dict.update({match[0]:m2})
601 return dict
602