add priority option to flow entry
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Copyright (c) 2015 SONATA-NFV and Paderborn University
3 ALL RIGHTS RESERVED.
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
20 permission.
21
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
27 """
28 import logging
29
30 import site
31 import time
32 from subprocess import Popen
33 import os
34 import re
35 import urllib2
36 from functools import partial
37
38 from mininet.net import Containernet
39 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
40 from mininet.cli import CLI
41 from mininet.link import TCLink
42 from mininet.clean import cleanup
43 import networkx as nx
44 from emuvim.dcemulator.monitoring import DCNetworkMonitor
45 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
46 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
47
48 LOG = logging.getLogger("dcemulator.net")
49 LOG.setLevel(logging.DEBUG)
50
51 class DCNetwork(Containernet):
52 """
53 Wraps the original Mininet/Containernet class and provides
54 methods to add data centers, switches, etc.
55
56 This class is used by topology definition scripts.
57 """
58
59 def __init__(self, controller=RemoteController, monitor=False,
60 enable_learning = True, # in case of RemoteController (Ryu), learning switch behavior can be turned off/on
61 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
62 dc_emulation_max_mem=512, # emulation max mem in MB
63 **kwargs):
64 """
65 Create an extended version of a Containernet network
66 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
67 :param kwargs: path through for Mininet parameters
68 :return:
69 """
70 # members
71 self.dcs = {}
72 self.ryu_process = None
73
74 # always cleanup environment before we start the emulator
75 self.killRyu()
76 cleanup()
77
78 # call original Docker.__init__ and setup default controller
79 Containernet.__init__(
80 self, switch=OVSKernelSwitch, controller=controller, **kwargs)
81
82 # Ryu management
83 if controller == RemoteController:
84 # start Ryu controller
85 self.startRyu(learning_switch=enable_learning)
86
87 # add the specified controller
88 self.addController('c0', controller=controller)
89
90 # graph of the complete DC network
91 self.DCNetwork_graph = nx.MultiDiGraph()
92
93 # initialize pool of vlan tags to setup the SDN paths
94 self.vlans = range(4096)[::-1]
95
96 # link to Ryu REST_API
97 ryu_ip = '0.0.0.0'
98 ryu_port = '8080'
99 self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)
100
101 # monitoring agent
102 if monitor:
103 self.monitor_agent = DCNetworkMonitor(self)
104 else:
105 self.monitor_agent = None
106
107 # initialize resource model registrar
108 self.rm_registrar = ResourceModelRegistrar(
109 dc_emulation_max_cpu, dc_emulation_max_mem)
110
111 def addDatacenter(self, label, metadata={}, resource_log_path=None):
112 """
113 Create and add a logical cloud data center to the network.
114 """
115 if label in self.dcs:
116 raise Exception("Data center label already exists: %s" % label)
117 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
118 dc.net = self # set reference to network
119 self.dcs[label] = dc
120 dc.create() # finally create the data center in our Mininet instance
121 LOG.info("added data center: %s" % label)
122 return dc
123
124 def addLink(self, node1, node2, **params):
125 """
126 Able to handle Datacenter objects as link
127 end points.
128 """
129 assert node1 is not None
130 assert node2 is not None
131 LOG.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
132 # ensure type of node1
133 if isinstance( node1, basestring ):
134 if node1 in self.dcs:
135 node1 = self.dcs[node1].switch
136 if isinstance( node1, Datacenter ):
137 node1 = node1.switch
138 # ensure type of node2
139 if isinstance( node2, basestring ):
140 if node2 in self.dcs:
141 node2 = self.dcs[node2].switch
142 if isinstance( node2, Datacenter ):
143 node2 = node2.switch
144 # try to give containers a default IP
145 if isinstance( node1, Docker ):
146 if "params1" not in params:
147 params["params1"] = {}
148 if "ip" not in params["params1"]:
149 params["params1"]["ip"] = self.getNextIp()
150 if isinstance( node2, Docker ):
151 if "params2" not in params:
152 params["params2"] = {}
153 if "ip" not in params["params2"]:
154 params["params2"]["ip"] = self.getNextIp()
155 # ensure that we allow TCLinks between data centers
156 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
157 # see Containernet issue: https://github.com/mpeuster/containernet/issues/3
158 if "cls" not in params:
159 params["cls"] = TCLink
160
161 link = Containernet.addLink(self, node1, node2, **params)
162
163 # try to give container interfaces a default id
164 node1_port_id = node1.ports[link.intf1]
165 if isinstance(node1, Docker):
166 if "id" in params["params1"]:
167 node1_port_id = params["params1"]["id"]
168 node1_port_name = link.intf1.name
169
170 node2_port_id = node2.ports[link.intf2]
171 if isinstance(node2, Docker):
172 if "id" in params["params2"]:
173 node2_port_id = params["params2"]["id"]
174 node2_port_name = link.intf2.name
175
176
177 # add edge and assigned port number to graph in both directions between node1 and node2
178 # port_id: id given in descriptor (if available, otherwise same as port)
179 # port: portnumber assigned by Containernet
180
181 attr_dict = {}
182 # possible weight metrics allowed by TClink class:
183 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
184 edge_attributes = [p for p in params if p in weight_metrics]
185 for attr in edge_attributes:
186 # if delay: strip ms (need number as weight in graph)
187 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
188 if match:
189 attr_number = match.group(1)
190 else:
191 attr_number = None
192 attr_dict[attr] = attr_number
193
194
195 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
196 'src_port_name': node1_port_name,
197 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
198 'dst_port_name': node2_port_name}
199 attr_dict2.update(attr_dict)
200 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
201
202 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
203 'src_port_name': node2_port_name,
204 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
205 'dst_port_name': node1_port_name}
206 attr_dict2.update(attr_dict)
207 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
208
209 return link
210
211 def addDocker( self, label, **params ):
212 """
213 Wrapper for addDocker method to use custom container class.
214 """
215 self.DCNetwork_graph.add_node(label)
216 return Containernet.addDocker(self, label, cls=EmulatorCompute, **params)
217
218 def removeDocker( self, label, **params ):
219 """
220 Wrapper for removeDocker method to update graph.
221 """
222 self.DCNetwork_graph.remove_node(label)
223 return Containernet.removeDocker(self, label, **params)
224
225 def addSwitch( self, name, add_to_graph=True, **params ):
226 """
227 Wrapper for addSwitch method to store switch also in graph.
228 """
229 if add_to_graph:
230 self.DCNetwork_graph.add_node(name)
231 return Containernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
232
233 def getAllContainers(self):
234 """
235 Returns a list with all containers within all data centers.
236 """
237 all_containers = []
238 for dc in self.dcs.itervalues():
239 all_containers += dc.listCompute()
240 return all_containers
241
242 def start(self):
243 # start
244 for dc in self.dcs.itervalues():
245 dc.start()
246 Containernet.start(self)
247
248 def stop(self):
249
250 # stop the monitor agent
251 if self.monitor_agent is not None:
252 self.monitor_agent.stop()
253
254 # stop emulator net
255 Containernet.stop(self)
256
257 # stop Ryu controller
258 self.killRyu()
259
260
261 def CLI(self):
262 CLI(self)
263
264 # to remove chain do setChain( src, dst, cmd='del-flows')
265 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
266 cmd = kwargs.get('cmd')
267 if cmd == 'add-flow':
268 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
269 if kwargs.get('bidirectional'):
270 ret = ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
271
272 elif cmd == 'del-flows':
273 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
274 if kwargs.get('bidirectional'):
275 ret = ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
276
277 else:
278 ret = "Command unknown"
279
280 return ret
281
282
283 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
284
285 src_sw = None
286 dst_sw = None
287 src_sw_inport_nr = 0
288 dst_sw_outport_nr = 0
289
290 LOG.debug("call chainAddFlow vnf_src_name=%r, vnf_src_interface=%r, vnf_dst_name=%r, vnf_dst_interface=%r",
291 vnf_src_name, vnf_src_interface, vnf_dst_name, vnf_dst_interface)
292
293 #check if port is specified (vnf:port)
294 if vnf_src_interface is None:
295 # take first interface by default
296 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
297 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
298 vnf_src_interface = link_dict[0]['src_port_id']
299
300 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
301 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
302 for link in link_dict:
303 if (link_dict[link]['src_port_id'] == vnf_src_interface or
304 link_dict[link]['src_port_name'] == vnf_src_interface): # Fix: we might also get interface names, e.g, from a son-emu-cli call
305 # found the right link and connected switch
306 src_sw = connected_sw
307 src_sw_inport_nr = link_dict[link]['dst_port_nr']
308 break
309
310 if vnf_dst_interface is None:
311 # take first interface by default
312 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
313 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
314 vnf_dst_interface = link_dict[0]['dst_port_id']
315
316 vnf_dst_name = vnf_dst_name.split(':')[0]
317 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
318 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
319 for link in link_dict:
320 if link_dict[link]['dst_port_id'] == vnf_dst_interface or \
321 link_dict[link]['dst_port_name'] == vnf_dst_interface: # Fix: we might also get interface names, e.g, from a son-emu-cli call
322 # found the right link and connected switch
323 dst_sw = connected_sw
324 dst_sw_outport_nr = link_dict[link]['src_port_nr']
325 break
326
327
328 # get shortest path
329 try:
330 # returns the first found shortest path
331 # if all shortest paths are wanted, use: all_shortest_paths
332 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
333 except:
334 LOG.exception("No path could be found between {0} and {1} using src_sw={2} and dst_sw={3}".format(
335 vnf_src_name, vnf_dst_name, src_sw, dst_sw))
336 LOG.debug("Graph nodes: %r" % self.DCNetwork_graph.nodes())
337 LOG.debug("Graph edges: %r" % self.DCNetwork_graph.edges())
338 for e, v in self.DCNetwork_graph.edges():
339 LOG.debug("%r" % self.DCNetwork_graph[e][v])
340 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
341
342 LOG.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
343
344 current_hop = src_sw
345 switch_inport_nr = src_sw_inport_nr
346
347 # choose free vlan if path contains more than 1 switch
348 cmd = kwargs.get('cmd')
349 vlan = None
350 if cmd == 'add-flow':
351 if len(path) > 1:
352 vlan = self.vlans.pop()
353
354 for i in range(0,len(path)):
355 current_node = self.getNodeByName(current_hop)
356
357 if path.index(current_hop) < len(path)-1:
358 next_hop = path[path.index(current_hop)+1]
359 else:
360 #last switch reached
361 next_hop = vnf_dst_name
362
363 next_node = self.getNodeByName(next_hop)
364
365 if next_hop == vnf_dst_name:
366 switch_outport_nr = dst_sw_outport_nr
367 LOG.info("end node reached: {0}".format(vnf_dst_name))
368 elif not isinstance( next_node, OVSSwitch ):
369 LOG.info("Next node: {0} is not a switch".format(next_hop))
370 return "Next node: {0} is not a switch".format(next_hop)
371 else:
372 # take first link between switches by default
373 index_edge_out = 0
374 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
375
376
377 # set of entry via ovs-ofctl
378 if isinstance( current_node, OVSSwitch ):
379 kwargs['vlan'] = vlan
380 kwargs['path'] = path
381 kwargs['current_hop'] = current_hop
382
383 if self.controller == RemoteController:
384 ## set flow entry via ryu rest api
385 self._set_flow_entry_ryu_rest(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
386 else:
387 ## set flow entry via ovs-ofctl
388 self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
389
390
391
392 # take first link between switches by default
393 if isinstance( next_node, OVSSwitch ):
394 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
395 current_hop = next_hop
396
397 return "path {2} between {0} and {1}".format(vnf_src_name, vnf_dst_name, cmd)
398
399 def _set_flow_entry_ryu_rest(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
400 match = 'in_port=%s' % switch_inport_nr
401
402 cookie = kwargs.get('cookie')
403 match_input = kwargs.get('match')
404 cmd = kwargs.get('cmd')
405 path = kwargs.get('path')
406 current_hop = kwargs.get('current_hop')
407 vlan = kwargs.get('vlan')
408 priority = kwargs.get('priority')
409
410 s = ','
411 if match_input:
412 match = s.join([match, match_input])
413
414 flow = {}
415 flow['dpid'] = int(node.dpid, 16)
416
417 if cookie:
418 flow['cookie'] = int(cookie)
419 if priority:
420 flow['priority'] = int(priority)
421
422 flow['actions'] = []
423
424 # possible Ryu actions, match fields:
425 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
426 if cmd == 'add-flow':
427 prefix = 'stats/flowentry/add'
428 if vlan != None:
429 if path.index(current_hop) == 0: # first node
430 action = {}
431 action['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
432 action['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
433 flow['actions'].append(action)
434 action = {}
435 action['type'] = 'SET_FIELD'
436 action['field'] = 'vlan_vid'
437 action['value'] = vlan
438 flow['actions'].append(action)
439 elif path.index(current_hop) == len(path) - 1: # last node
440 match += ',dl_vlan=%s' % vlan
441 action = {}
442 action['type'] = 'POP_VLAN'
443 flow['actions'].append(action)
444 else: # middle nodes
445 match += ',dl_vlan=%s' % vlan
446 # output action must come last
447 action = {}
448 action['type'] = 'OUTPUT'
449 action['port'] = switch_outport_nr
450 flow['actions'].append(action)
451
452 elif cmd == 'del-flows':
453 prefix = 'stats/flowentry/delete'
454
455 if cookie:
456 # TODO: add cookie_mask as argument
457 flow['cookie_mask'] = int('0xffffffffffffffff', 16) # need full mask to match complete cookie
458
459 action = {}
460 action['type'] = 'OUTPUT'
461 action['port'] = switch_outport_nr
462 flow['actions'].append(action)
463
464 flow['match'] = self._parse_match(match)
465 self.ryu_REST(prefix, data=flow)
466
467 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
468 match = 'in_port=%s' % switch_inport_nr
469
470 cookie = kwargs.get('cookie')
471 match_input = kwargs.get('match')
472 cmd = kwargs.get('cmd')
473 path = kwargs.get('path')
474 current_hop = kwargs.get('current_hop')
475 vlan = kwargs.get('vlan')
476
477 s = ','
478 if cookie:
479 cookie = 'cookie=%s' % cookie
480 match = s.join([cookie, match])
481 if match_input:
482 match = s.join([match, match_input])
483 if cmd == 'add-flow':
484 action = 'action=%s' % switch_outport_nr
485 if vlan != None:
486 if path.index(current_hop) == 0: # first node
487 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
488 match = '-O OpenFlow13 ' + match
489 elif path.index(current_hop) == len(path) - 1: # last node
490 match += ',dl_vlan=%s' % vlan
491 action = 'action=strip_vlan,output=%s' % switch_outport_nr
492 else: # middle nodes
493 match += ',dl_vlan=%s' % vlan
494 ofcmd = s.join([match, action])
495 elif cmd == 'del-flows':
496 ofcmd = match
497 else:
498 ofcmd = ''
499
500 node.dpctl(cmd, ofcmd)
501 LOG.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
502 switch_outport_nr, cmd))
503
504 # start Ryu Openflow controller as Remote Controller for the DCNetwork
505 def startRyu(self, learning_switch=True):
506 # start Ryu controller with rest-API
507 python_install_path = site.getsitepackages()[0]
508 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
509 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
510 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
511 # Ryu still uses 6633 as default
512 ryu_option = '--ofp-tcp-listen-port'
513 ryu_of_port = '6653'
514 ryu_cmd = 'ryu-manager'
515 FNULL = open("/tmp/ryu.log", 'w')
516 if learning_switch:
517 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
518 else:
519 # no learning switch, but with rest api
520 self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
521 time.sleep(1)
522
523 def killRyu(self):
524 """
525 Stop the Ryu controller that might be started by son-emu.
526 :return:
527 """
528 # try it nicely
529 if self.ryu_process is not None:
530 self.ryu_process.terminate()
531 self.ryu_process.kill()
532 # ensure its death ;-)
533 Popen(['pkill', '-f', 'ryu-manager'])
534
535 def ryu_REST(self, prefix, dpid=None, data=None):
536 try:
537 if dpid:
538 url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)
539 else:
540 url = self.ryu_REST_api + '/' + str(prefix)
541 if data:
542 #LOG.info('POST: {0}'.format(str(data)))
543 req = urllib2.Request(url, str(data))
544 else:
545 req = urllib2.Request(url)
546
547 ret = urllib2.urlopen(req).read()
548 return ret
549 except:
550 LOG.info('error url: {0}'.format(str(url)))
551 if data: LOG.info('error POST: {0}'.format(str(data)))
552
553 # need to respect that some match fields must be integers
554 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
555 def _parse_match(self, match):
556 matches = match.split(',')
557 dict = {}
558 for m in matches:
559 match = m.split('=')
560 if len(match) == 2:
561 try:
562 m2 = int(match[1], 0)
563 except:
564 m2 = match[1]
565
566 dict.update({match[0]:m2})
567 return dict
568