cleanup ryu and leftover containers at startup
[osm/vim-emu.git] / src / emuvim / dcemulator / net.py
1 """
2 Copyright (c) 2015 SONATA-NFV and Paderborn University
3 ALL RIGHTS RESERVED.
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
20 permission.
21
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
27 """
28 import logging
29
30 import site
31 import time
32 from subprocess import Popen
33 import os
34 import re
35 import urllib2
36 from functools import partial
37
38 from mininet.net import Containernet
39 from mininet.node import Controller, DefaultController, OVSSwitch, OVSKernelSwitch, Docker, RemoteController
40 from mininet.cli import CLI
41 from mininet.link import TCLink
42 import networkx as nx
43 from emuvim.dcemulator.monitoring import DCNetworkMonitor
44 from emuvim.dcemulator.node import Datacenter, EmulatorCompute
45 from emuvim.dcemulator.resourcemodel import ResourceModelRegistrar
46
47 class DCNetwork(Containernet):
48 """
49 Wraps the original Mininet/Containernet class and provides
50 methods to add data centers, switches, etc.
51
52 This class is used by topology definition scripts.
53 """
54
55 def __init__(self, controller=RemoteController, monitor=False,
56 enable_learning = True, # in case of RemoteController (Ryu), learning switch behavior can be turned off/on
57 dc_emulation_max_cpu=1.0, # fraction of overall CPU time for emulation
58 dc_emulation_max_mem=512, # emulation max mem in MB
59 **kwargs):
60 """
61 Create an extended version of a Containernet network
62 :param dc_emulation_max_cpu: max. CPU time used by containers in data centers
63 :param kwargs: path through for Mininet parameters
64 :return:
65 """
66 self.dcs = {}
67
68 # make sure any remaining Ryu processes are killed
69 self.killRyu()
70 # make sure no containers are left over from a previous emulator run.
71 self.removeLeftoverContainers()
72
73 # call original Docker.__init__ and setup default controller
74 Containernet.__init__(
75 self, switch=OVSKernelSwitch, controller=controller, **kwargs)
76
77
78 # Ryu management
79 self.ryu_process = None
80 if controller == RemoteController:
81 # start Ryu controller
82 self.startRyu(learning_switch=enable_learning)
83
84 # add the specified controller
85 self.addController('c0', controller=controller)
86
87 # graph of the complete DC network
88 self.DCNetwork_graph = nx.MultiDiGraph()
89
90 # initialize pool of vlan tags to setup the SDN paths
91 self.vlans = range(4096)[::-1]
92
93 # link to Ryu REST_API
94 ryu_ip = '0.0.0.0'
95 ryu_port = '8080'
96 self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)
97
98 # monitoring agent
99 if monitor:
100 self.monitor_agent = DCNetworkMonitor(self)
101 else:
102 self.monitor_agent = None
103
104 # initialize resource model registrar
105 self.rm_registrar = ResourceModelRegistrar(
106 dc_emulation_max_cpu, dc_emulation_max_mem)
107
108 def addDatacenter(self, label, metadata={}, resource_log_path=None):
109 """
110 Create and add a logical cloud data center to the network.
111 """
112 if label in self.dcs:
113 raise Exception("Data center label already exists: %s" % label)
114 dc = Datacenter(label, metadata=metadata, resource_log_path=resource_log_path)
115 dc.net = self # set reference to network
116 self.dcs[label] = dc
117 dc.create() # finally create the data center in our Mininet instance
118 logging.info("added data center: %s" % label)
119 return dc
120
121 def addLink(self, node1, node2, **params):
122 """
123 Able to handle Datacenter objects as link
124 end points.
125 """
126 assert node1 is not None
127 assert node2 is not None
128 logging.debug("addLink: n1=%s n2=%s" % (str(node1), str(node2)))
129 # ensure type of node1
130 if isinstance( node1, basestring ):
131 if node1 in self.dcs:
132 node1 = self.dcs[node1].switch
133 if isinstance( node1, Datacenter ):
134 node1 = node1.switch
135 # ensure type of node2
136 if isinstance( node2, basestring ):
137 if node2 in self.dcs:
138 node2 = self.dcs[node2].switch
139 if isinstance( node2, Datacenter ):
140 node2 = node2.switch
141 # try to give containers a default IP
142 if isinstance( node1, Docker ):
143 if "params1" not in params:
144 params["params1"] = {}
145 if "ip" not in params["params1"]:
146 params["params1"]["ip"] = self.getNextIp()
147 if isinstance( node2, Docker ):
148 if "params2" not in params:
149 params["params2"] = {}
150 if "ip" not in params["params2"]:
151 params["params2"]["ip"] = self.getNextIp()
152 # ensure that we allow TCLinks between data centers
153 # TODO this is not optimal, we use cls=Link for containers and TCLink for data centers
154 # see Containernet issue: https://github.com/mpeuster/containernet/issues/3
155 if "cls" not in params:
156 params["cls"] = TCLink
157
158 link = Containernet.addLink(self, node1, node2, **params)
159
160 # try to give container interfaces a default id
161 node1_port_id = node1.ports[link.intf1]
162 if isinstance(node1, Docker):
163 if "id" in params["params1"]:
164 node1_port_id = params["params1"]["id"]
165 node1_port_name = link.intf1.name
166
167 node2_port_id = node2.ports[link.intf2]
168 if isinstance(node2, Docker):
169 if "id" in params["params2"]:
170 node2_port_id = params["params2"]["id"]
171 node2_port_name = link.intf2.name
172
173
174 # add edge and assigned port number to graph in both directions between node1 and node2
175 # port_id: id given in descriptor (if available, otherwise same as port)
176 # port: portnumber assigned by Containernet
177
178 attr_dict = {}
179 # possible weight metrics allowed by TClink class:
180 weight_metrics = ['bw', 'delay', 'jitter', 'loss']
181 edge_attributes = [p for p in params if p in weight_metrics]
182 for attr in edge_attributes:
183 # if delay: strip ms (need number as weight in graph)
184 match = re.search('([0-9]*\.?[0-9]+)', params[attr])
185 if match:
186 attr_number = match.group(1)
187 else:
188 attr_number = None
189 attr_dict[attr] = attr_number
190
191
192 attr_dict2 = {'src_port_id': node1_port_id, 'src_port_nr': node1.ports[link.intf1],
193 'src_port_name': node1_port_name,
194 'dst_port_id': node2_port_id, 'dst_port_nr': node2.ports[link.intf2],
195 'dst_port_name': node2_port_name}
196 attr_dict2.update(attr_dict)
197 self.DCNetwork_graph.add_edge(node1.name, node2.name, attr_dict=attr_dict2)
198
199 attr_dict2 = {'src_port_id': node2_port_id, 'src_port_nr': node2.ports[link.intf2],
200 'src_port_name': node2_port_name,
201 'dst_port_id': node1_port_id, 'dst_port_nr': node1.ports[link.intf1],
202 'dst_port_name': node1_port_name}
203 attr_dict2.update(attr_dict)
204 self.DCNetwork_graph.add_edge(node2.name, node1.name, attr_dict=attr_dict2)
205
206 return link
207
208 def addDocker( self, label, **params ):
209 """
210 Wrapper for addDocker method to use custom container class.
211 """
212 self.DCNetwork_graph.add_node(label)
213 return Containernet.addDocker(self, label, cls=EmulatorCompute, **params)
214
215 def removeDocker( self, label, **params ):
216 """
217 Wrapper for removeDocker method to update graph.
218 """
219 self.DCNetwork_graph.remove_node(label)
220 return Containernet.removeDocker(self, label, **params)
221
222 def addSwitch( self, name, add_to_graph=True, **params ):
223 """
224 Wrapper for addSwitch method to store switch also in graph.
225 """
226 if add_to_graph:
227 self.DCNetwork_graph.add_node(name)
228 return Containernet.addSwitch(self, name, protocols='OpenFlow10,OpenFlow12,OpenFlow13', **params)
229
230 def getAllContainers(self):
231 """
232 Returns a list with all containers within all data centers.
233 """
234 all_containers = []
235 for dc in self.dcs.itervalues():
236 all_containers += dc.listCompute()
237 return all_containers
238
239 def start(self):
240 # start
241 for dc in self.dcs.itervalues():
242 dc.start()
243 Containernet.start(self)
244
245 def stop(self):
246
247 # stop the monitor agent
248 if self.monitor_agent is not None:
249 self.monitor_agent.stop()
250
251 # stop emulator net
252 Containernet.stop(self)
253
254 # stop Ryu controller
255 self.stopRyu()
256
257
258 def CLI(self):
259 CLI(self)
260
261 # to remove chain do setChain( src, dst, cmd='del-flows')
262 def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
263 cmd = kwargs.get('cmd')
264 if cmd == 'add-flow':
265 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
266 if kwargs.get('bidirectional'):
267 ret = ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
268
269 elif cmd == 'del-flows':
270 ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
271 if kwargs.get('bidirectional'):
272 ret = ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
273
274 else:
275 ret = "Command unknown"
276
277 return ret
278
279
280 def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
281
282 #check if port is specified (vnf:port)
283 if vnf_src_interface is None:
284 # take first interface by default
285 connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
286 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
287 vnf_src_interface = link_dict[0]['src_port_id']
288
289 for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
290 link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
291 for link in link_dict:
292 if link_dict[link]['src_port_id'] == vnf_src_interface:
293 # found the right link and connected switch
294 src_sw = connected_sw
295
296 src_sw_inport_nr = link_dict[link]['dst_port_nr']
297 break
298
299 if vnf_dst_interface is None:
300 # take first interface by default
301 connected_sw = self.DCNetwork_graph.neighbors(vnf_dst_name)[0]
302 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
303 vnf_dst_interface = link_dict[0]['dst_port_id']
304
305 vnf_dst_name = vnf_dst_name.split(':')[0]
306 for connected_sw in self.DCNetwork_graph.neighbors(vnf_dst_name):
307 link_dict = self.DCNetwork_graph[connected_sw][vnf_dst_name]
308 for link in link_dict:
309 if link_dict[link]['dst_port_id'] == vnf_dst_interface:
310 # found the right link and connected switch
311 dst_sw = connected_sw
312 dst_sw_outport_nr = link_dict[link]['src_port_nr']
313 break
314
315
316 # get shortest path
317 try:
318 # returns the first found shortest path
319 # if all shortest paths are wanted, use: all_shortest_paths
320 path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
321 except:
322 logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
323 return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
324
325 logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
326
327 current_hop = src_sw
328 switch_inport_nr = src_sw_inport_nr
329
330 # choose free vlan if path contains more than 1 switch
331 cmd = kwargs.get('cmd')
332 vlan = None
333 if cmd == 'add-flow':
334 if len(path) > 1:
335 vlan = self.vlans.pop()
336
337 for i in range(0,len(path)):
338 current_node = self.getNodeByName(current_hop)
339
340 if path.index(current_hop) < len(path)-1:
341 next_hop = path[path.index(current_hop)+1]
342 else:
343 #last switch reached
344 next_hop = vnf_dst_name
345
346 next_node = self.getNodeByName(next_hop)
347
348 if next_hop == vnf_dst_name:
349 switch_outport_nr = dst_sw_outport_nr
350 logging.info("end node reached: {0}".format(vnf_dst_name))
351 elif not isinstance( next_node, OVSSwitch ):
352 logging.info("Next node: {0} is not a switch".format(next_hop))
353 return "Next node: {0} is not a switch".format(next_hop)
354 else:
355 # take first link between switches by default
356 index_edge_out = 0
357 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
358
359
360 # set of entry via ovs-ofctl
361 if isinstance( current_node, OVSSwitch ):
362 kwargs['vlan'] = vlan
363 kwargs['path'] = path
364 kwargs['current_hop'] = current_hop
365
366 if self.controller == RemoteController:
367 ## set flow entry via ryu rest api
368 self._set_flow_entry_ryu_rest(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
369 else:
370 ## set flow entry via ovs-ofctl
371 self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
372
373
374
375 # take first link between switches by default
376 if isinstance( next_node, OVSSwitch ):
377 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
378 current_hop = next_hop
379
380 return "path {2} between {0} and {1}".format(vnf_src_name, vnf_dst_name, cmd)
381
382 def _set_flow_entry_ryu_rest(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
383 match = 'in_port=%s' % switch_inport_nr
384
385 cookie = kwargs.get('cookie')
386 match_input = kwargs.get('match')
387 cmd = kwargs.get('cmd')
388 path = kwargs.get('path')
389 current_hop = kwargs.get('current_hop')
390 vlan = kwargs.get('vlan')
391
392 s = ','
393 if match_input:
394 match = s.join([match, match_input])
395
396 flow = {}
397 flow['dpid'] = int(node.dpid, 16)
398
399 if cookie:
400 flow['cookie'] = int(cookie)
401
402
403 flow['actions'] = []
404
405 # possible Ryu actions, match fields:
406 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#add-a-flow-entry
407 if cmd == 'add-flow':
408 prefix = 'stats/flowentry/add'
409 if vlan != None:
410 if path.index(current_hop) == 0: # first node
411 action = {}
412 action['type'] = 'PUSH_VLAN' # Push a new VLAN tag if a input frame is non-VLAN-tagged
413 action['ethertype'] = 33024 # Ethertype 0x8100(=33024): IEEE 802.1Q VLAN-tagged frame
414 flow['actions'].append(action)
415 action = {}
416 action['type'] = 'SET_FIELD'
417 action['field'] = 'vlan_vid'
418 action['value'] = vlan
419 flow['actions'].append(action)
420 elif path.index(current_hop) == len(path) - 1: # last node
421 match += ',dl_vlan=%s' % vlan
422 action = {}
423 action['type'] = 'POP_VLAN'
424 flow['actions'].append(action)
425 else: # middle nodes
426 match += ',dl_vlan=%s' % vlan
427 # output action must come last
428 action = {}
429 action['type'] = 'OUTPUT'
430 action['port'] = switch_outport_nr
431 flow['actions'].append(action)
432
433 elif cmd == 'del-flows':
434 prefix = 'stats/flowentry/delete'
435
436 if cookie:
437 # TODO: add cookie_mask as argument
438 flow['cookie_mask'] = int('0xffffffffffffffff', 16) # need full mask to match complete cookie
439
440 action = {}
441 action['type'] = 'OUTPUT'
442 action['port'] = switch_outport_nr
443 flow['actions'].append(action)
444
445 flow['match'] = self._parse_match(match)
446 self.ryu_REST(prefix, data=flow)
447
448 def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
449 match = 'in_port=%s' % switch_inport_nr
450
451 cookie = kwargs.get('cookie')
452 match_input = kwargs.get('match')
453 cmd = kwargs.get('cmd')
454 path = kwargs.get('path')
455 current_hop = kwargs.get('current_hop')
456 vlan = kwargs.get('vlan')
457
458 s = ','
459 if cookie:
460 cookie = 'cookie=%s' % cookie
461 match = s.join([cookie, match])
462 if match_input:
463 match = s.join([match, match_input])
464 if cmd == 'add-flow':
465 action = 'action=%s' % switch_outport_nr
466 if vlan != None:
467 if path.index(current_hop) == 0: # first node
468 action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
469 match = '-O OpenFlow13 ' + match
470 elif path.index(current_hop) == len(path) - 1: # last node
471 match += ',dl_vlan=%s' % vlan
472 action = 'action=strip_vlan,output=%s' % switch_outport_nr
473 else: # middle nodes
474 match += ',dl_vlan=%s' % vlan
475 ofcmd = s.join([match, action])
476 elif cmd == 'del-flows':
477 ofcmd = match
478 else:
479 ofcmd = ''
480
481 node.dpctl(cmd, ofcmd)
482 logging.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
483 switch_outport_nr, cmd))
484
485 # start Ryu Openflow controller as Remote Controller for the DCNetwork
486 def startRyu(self, learning_switch=True):
487 # start Ryu controller with rest-API
488 python_install_path = site.getsitepackages()[0]
489 ryu_path = python_install_path + '/ryu/app/simple_switch_13.py'
490 ryu_path2 = python_install_path + '/ryu/app/ofctl_rest.py'
491 # change the default Openflow controller port to 6653 (official IANA-assigned port number), as used by Mininet
492 # Ryu still uses 6633 as default
493 ryu_option = '--ofp-tcp-listen-port'
494 ryu_of_port = '6653'
495 ryu_cmd = 'ryu-manager'
496 FNULL = open("/tmp/ryu.log", 'w')
497 if learning_switch:
498 self.ryu_process = Popen([ryu_cmd, ryu_path, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
499 else:
500 # no learning switch, but with rest api
501 self.ryu_process = Popen([ryu_cmd, ryu_path2, ryu_option, ryu_of_port], stdout=FNULL, stderr=FNULL)
502 time.sleep(1)
503
504 def stopRyu(self):
505 if self.ryu_process is not None:
506 self.ryu_process.terminate()
507 self.ryu_process.kill()
508 self.killRyu()
509
510 @staticmethod
511 def removeLeftoverContainers():
512 # TODO can be more python-based using eg. docker-py?
513 Popen('docker ps -a -q --filter="name=mn.*" | xargs -r docker rm -f', shell=True)
514
515 @staticmethod
516 def killRyu():
517 Popen(['pkill', '-f', 'ryu-manager'])
518
519 def ryu_REST(self, prefix, dpid=None, data=None):
520 try:
521 if dpid:
522 url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)
523 else:
524 url = self.ryu_REST_api + '/' + str(prefix)
525 if data:
526 #logging.info('POST: {0}'.format(str(data)))
527 req = urllib2.Request(url, str(data))
528 else:
529 req = urllib2.Request(url)
530
531 ret = urllib2.urlopen(req).read()
532 return ret
533 except:
534 logging.info('error url: {0}'.format(str(url)))
535 if data: logging.info('error POST: {0}'.format(str(data)))
536
537 # need to respect that some match fields must be integers
538 # http://ryu.readthedocs.io/en/latest/app/ofctl_rest.html#description-of-match-and-actions
539 def _parse_match(self, match):
540 matches = match.split(',')
541 dict = {}
542 for m in matches:
543 match = m.split('=')
544 if len(match) == 2:
545 try:
546 m2 = int(match[1], 0)
547 except:
548 m2 = match[1]
549
550 dict.update({match[0]:m2})
551 return dict
552