cleanup ryu and leftover containers at startup
[osm/vim-emu.git] / src / emuvim / dcemulator / monitoring.py
1 """
2 Copyright (c) 2015 SONATA-NFV
3 ALL RIGHTS RESERVED.
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
20 permission.
21
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
27 """
28
29 import logging
30 from mininet.node import OVSSwitch
31 import ast
32 import time
33 from prometheus_client import start_http_server, Summary, Histogram, Gauge, Counter, REGISTRY, CollectorRegistry, \
34 pushadd_to_gateway, push_to_gateway, delete_from_gateway
35 import threading
36 from subprocess import Popen
37 import os
38
39
40 logging.basicConfig(level=logging.INFO)
41
42 """
43 class to read openflow stats from the Ryu controller of the DCNetwork
44 """
45
46 class DCNetworkMonitor():
47 def __init__(self, net):
48 self.net = net
49
50 # TODO: these global variables should be part of a config file?
51 '''
52 # prometheus is started outside of son-emu
53 prometheus_ip = '127.0.0.1'
54 prometheus_port = '9090'
55 self.prometheus_REST_api = 'http://{0}:{1}'.format(prometheus_ip, prometheus_port)
56 '''
57 # helper variables to calculate the metrics
58 # pushgateway is started outside of son-emu and son-emu is started with net=host
59 # so localhost:9091 works
60 self.pushgateway = 'localhost:9091'
61 # when sdk is started with docker-compose, we could use
62 # self.pushgateway = 'pushgateway:9091'
63
64 # supported Prometheus metrics
65 self.registry = CollectorRegistry()
66 self.prom_tx_packet_count = Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
67 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
68 self.prom_rx_packet_count = Gauge('sonemu_rx_count_packets', 'Total number of packets received',
69 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
70 self.prom_tx_byte_count = Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
71 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
72 self.prom_rx_byte_count = Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
73 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
74
75 self.prom_metrics={'tx_packets':self.prom_tx_packet_count, 'rx_packets':self.prom_rx_packet_count,
76 'tx_bytes':self.prom_tx_byte_count,'rx_bytes':self.prom_rx_byte_count}
77
78 # list of installed metrics to monitor
79 # each entry can contain this data
80 '''
81 {
82 switch_dpid = 0
83 vnf_name = None
84 vnf_interface = None
85 previous_measurement = 0
86 previous_monitor_time = 0
87 metric_key = None
88 mon_port = None
89 }
90 '''
91 self.monitor_lock = threading.Lock()
92 self.monitor_flow_lock = threading.Lock()
93 self.network_metrics = []
94 self.flow_metrics = []
95
96 # start monitoring thread
97 self.start_monitoring = True
98 self.monitor_thread = threading.Thread(target=self.get_network_metrics)
99 self.monitor_thread.start()
100
101 self.monitor_flow_thread = threading.Thread(target=self.get_flow_metrics)
102 self.monitor_flow_thread.start()
103
104 # helper tools
105 # cAdvisor, Prometheus pushgateway and DB are started as external container, outside of son-emu
106
107
108 # first set some parameters, before measurement can start
109 def setup_flow(self, vnf_name, vnf_interface=None, metric='tx_packets', cookie=0):
110
111 flow_metric = {}
112
113 # check if port is specified (vnf:port)
114 if vnf_interface is None:
115 # take first interface by default
116 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
117 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
118 vnf_interface = link_dict[0]['src_port_id']
119
120 flow_metric['vnf_name'] = vnf_name
121 flow_metric['vnf_interface'] = vnf_interface
122
123 vnf_switch = None
124 for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):
125 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
126 for link in link_dict:
127 if link_dict[link]['src_port_id'] == vnf_interface:
128 # found the right link and connected switch
129 vnf_switch = connected_sw
130 flow_metric['mon_port'] = link_dict[link]['dst_port_nr']
131 break
132
133 if not vnf_switch:
134 logging.exception("vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface))
135 return "vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface)
136
137 try:
138 # default port direction to monitor
139 if metric is None:
140 metric = 'tx_packets'
141
142 next_node = self.net.getNodeByName(vnf_switch)
143
144 if not isinstance(next_node, OVSSwitch):
145 logging.info("vnf: {0} is not connected to switch".format(vnf_name))
146 return
147
148 flow_metric['previous_measurement'] = 0
149 flow_metric['previous_monitor_time'] = 0
150
151 flow_metric['switch_dpid'] = int(str(next_node.dpid), 16)
152 flow_metric['metric_key'] = metric
153 flow_metric['cookie'] = cookie
154
155 self.monitor_flow_lock.acquire()
156 self.flow_metrics.append(flow_metric)
157 self.monitor_flow_lock.release()
158
159 logging.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie))
160 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie)
161
162 except Exception as ex:
163 logging.exception("setup_metric error.")
164 return ex.message
165
166 def stop_flow(self, vnf_name, vnf_interface=None, metric=None, cookie=0,):
167
168 # check if port is specified (vnf:port)
169 if vnf_interface is None and metric is not None:
170 # take first interface by default
171 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
172 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
173 vnf_interface = link_dict[0]['src_port_id']
174
175 for flow_dict in self.flow_metrics:
176 if flow_dict['vnf_name'] == vnf_name and flow_dict['vnf_interface'] == vnf_interface \
177 and flow_dict['metric_key'] == metric and flow_dict['cookie'] == cookie:
178
179 self.monitor_flow_lock.acquire()
180
181 self.flow_metrics.remove(flow_dict)
182
183 for collector in self.registry._collectors:
184 if (vnf_name, vnf_interface, cookie) in collector._metrics:
185 collector.remove(vnf_name, vnf_interface, cookie)
186
187 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
188
189 self.monitor_flow_lock.release()
190
191 logging.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie))
192 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie)
193
194 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric, vnf_name, vnf_interface)
195
196
197 # first set some parameters, before measurement can start
198 def setup_metric(self, vnf_name, vnf_interface=None, metric='tx_packets'):
199
200 network_metric = {}
201
202 # check if port is specified (vnf:port)
203 if vnf_interface is None:
204 # take first interface by default
205 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
206 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
207 vnf_interface = link_dict[0]['src_port_id']
208
209 network_metric['vnf_name'] = vnf_name
210 network_metric['vnf_interface'] = vnf_interface
211
212 for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):
213 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
214 for link in link_dict:
215 if link_dict[link]['src_port_id'] == vnf_interface:
216 # found the right link and connected switch
217 network_metric['mon_port'] = link_dict[link]['dst_port_nr']
218 break
219
220 if 'mon_port' not in network_metric:
221 logging.exception("vnf interface {0}:{1} not found!".format(vnf_name,vnf_interface))
222 return "vnf interface {0}:{1} not found!".format(vnf_name,vnf_interface)
223
224 try:
225 # default port direction to monitor
226 if metric is None:
227 metric = 'tx_packets'
228
229 vnf_switch = self.net.DCNetwork_graph.neighbors(str(vnf_name))
230
231 if len(vnf_switch) > 1:
232 logging.info("vnf: {0} has multiple ports".format(vnf_name))
233 return
234 elif len(vnf_switch) == 0:
235 logging.info("vnf: {0} is not connected".format(vnf_name))
236 return
237 else:
238 vnf_switch = vnf_switch[0]
239 next_node = self.net.getNodeByName(vnf_switch)
240
241 if not isinstance(next_node, OVSSwitch):
242 logging.info("vnf: {0} is not connected to switch".format(vnf_name))
243 return
244
245 network_metric['previous_measurement'] = 0
246 network_metric['previous_monitor_time'] = 0
247
248
249 network_metric['switch_dpid'] = int(str(next_node.dpid), 16)
250 network_metric['metric_key'] = metric
251
252 self.monitor_lock.acquire()
253
254 self.network_metrics.append(network_metric)
255 self.monitor_lock.release()
256
257
258 logging.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))
259 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)
260
261 except Exception as ex:
262 logging.exception("setup_metric error.")
263 return ex.message
264
265 def stop_metric(self, vnf_name, vnf_interface=None, metric=None):
266
267 # check if port is specified (vnf:port)
268 if vnf_interface is None and metric is not None:
269 # take first interface by default
270 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
271 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
272 vnf_interface = link_dict[0]['src_port_id']
273
274 for metric_dict in self.network_metrics:
275 if metric_dict['vnf_name'] == vnf_name and metric_dict['vnf_interface'] == vnf_interface \
276 and metric_dict['metric_key'] == metric:
277
278 self.monitor_lock.acquire()
279
280 self.network_metrics.remove(metric_dict)
281
282 #this removes the complete metric, all labels...
283 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
284 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
285
286 for collector in self.registry._collectors :
287
288 """
289 INFO:root:name:sonemu_rx_count_packets
290 labels:('vnf_name', 'vnf_interface')
291 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
292 object
293 at
294 0x7f353447fd10 >}
295 """
296 logging.info('{0}'.format(collector._metrics.values()))
297
298 if (vnf_name, vnf_interface, 'None') in collector._metrics:
299 logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
300 collector._metrics))
301 collector.remove(vnf_name, vnf_interface, 'None')
302
303 # set values to NaN, prometheus api currently does not support removal of metrics
304 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
305
306 # this removes the complete metric, all labels...
307 # 1 single monitor job for all metrics of the SDN controller
308 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
309 # we can not specify labels from the metrics to be removed
310 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
311 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
312
313 self.monitor_lock.release()
314
315 logging.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))
316 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)
317
318 # delete everything from this vnf
319 elif metric_dict['vnf_name'] == vnf_name and vnf_interface is None and metric is None:
320 self.monitor_lock.acquire()
321 self.network_metrics.remove(metric_dict)
322 for collector in self.registry._collectors:
323 collector_dict = collector._metrics.copy()
324 for name, interface, id in collector_dict:
325 if name == vnf_name:
326 logging.info('3 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
327 collector._metrics))
328 collector.remove(name, interface, 'None')
329
330 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
331 self.monitor_lock.release()
332 logging.info('Stopped monitoring vnf: {0}'.format(vnf_name))
333 return 'Stopped monitoring: {0}'.format(vnf_name)
334
335 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric, vnf_name, vnf_interface)
336
337
338
339
340
341 # get all metrics defined in the list and export it to Prometheus
342 def get_flow_metrics(self):
343 while self.start_monitoring:
344
345 self.monitor_flow_lock.acquire()
346
347 for flow_dict in self.flow_metrics:
348 data = {}
349
350 data['cookie'] = flow_dict['cookie']
351
352 if 'tx' in flow_dict['metric_key']:
353 data['match'] = {'in_port':flow_dict['mon_port']}
354 elif 'rx' in flow_dict['metric_key']:
355 data['out_port'] = flow_dict['mon_port']
356
357
358 # query Ryu
359 ret = self.net.ryu_REST('stats/flow', dpid=flow_dict['switch_dpid'], data=data)
360 flow_stat_dict = ast.literal_eval(ret)
361
362 logging.debug('received flow stat:{0} '.format(flow_stat_dict))
363 self.set_flow_metric(flow_dict, flow_stat_dict)
364
365 self.monitor_flow_lock.release()
366 time.sleep(1)
367
368 def get_network_metrics(self):
369 while self.start_monitoring:
370
371 self.monitor_lock.acquire()
372
373 # group metrics by dpid to optimize the rest api calls
374 dpid_list = [metric_dict['switch_dpid'] for metric_dict in self.network_metrics]
375 dpid_set = set(dpid_list)
376
377 for dpid in dpid_set:
378
379 # query Ryu
380 ret = self.net.ryu_REST('stats/port', dpid=dpid)
381 port_stat_dict = ast.literal_eval(ret)
382
383 metric_list = [metric_dict for metric_dict in self.network_metrics
384 if int(metric_dict['switch_dpid'])==int(dpid)]
385
386 for metric_dict in metric_list:
387 self.set_network_metric(metric_dict, port_stat_dict)
388
389 self.monitor_lock.release()
390 time.sleep(1)
391
392 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
393 def set_network_metric(self, metric_dict, port_stat_dict):
394 # vnf tx is the datacenter switch rx and vice-versa
395 metric_key = self.switch_tx_rx(metric_dict['metric_key'])
396 switch_dpid = metric_dict['switch_dpid']
397 vnf_name = metric_dict['vnf_name']
398 vnf_interface = metric_dict['vnf_interface']
399 previous_measurement = metric_dict['previous_measurement']
400 previous_monitor_time = metric_dict['previous_monitor_time']
401 mon_port = metric_dict['mon_port']
402
403 for port_stat in port_stat_dict[str(switch_dpid)]:
404 if int(port_stat['port_no']) == int(mon_port):
405 port_uptime = port_stat['duration_sec'] + port_stat['duration_nsec'] * 10 ** (-9)
406 this_measurement = int(port_stat[metric_key])
407
408 # set prometheus metric
409 self.prom_metrics[metric_dict['metric_key']].\
410 labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': None}).\
411 set(this_measurement)
412
413 # 1 single monitor job for all metrics of the SDN controller
414 pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)
415
416 # also the rate is calculated here, but not used for now
417 # (rate can be easily queried from prometheus also)
418 if previous_monitor_time <= 0 or previous_monitor_time >= port_uptime:
419 metric_dict['previous_measurement'] = int(port_stat[metric_key])
420 metric_dict['previous_monitor_time'] = port_uptime
421 # do first measurement
422 time.sleep(1)
423 self.monitor_lock.release()
424
425 metric_rate = self.get_network_metrics()
426 return metric_rate
427
428 else:
429 time_delta = (port_uptime - metric_dict['previous_monitor_time'])
430 metric_rate = (this_measurement - metric_dict['previous_measurement']) / float(time_delta)
431
432 metric_dict['previous_measurement'] = this_measurement
433 metric_dict['previous_monitor_time'] = port_uptime
434 return metric_rate
435
436 logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))
437 return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)
438
439 def set_flow_metric(self, metric_dict, flow_stat_dict):
440 # vnf tx is the datacenter switch rx and vice-versa
441 metric_key = metric_dict['metric_key']
442 switch_dpid = metric_dict['switch_dpid']
443 vnf_name = metric_dict['vnf_name']
444 vnf_interface = metric_dict['vnf_interface']
445 previous_measurement = metric_dict['previous_measurement']
446 previous_monitor_time = metric_dict['previous_monitor_time']
447 cookie = metric_dict['cookie']
448
449 # TODO aggregate all found flow stats
450 #flow_stat = flow_stat_dict[str(switch_dpid)][0]
451 #if 'bytes' in metric_key:
452 # counter = flow_stat['byte_count']
453 #elif 'packet' in metric_key:
454 # counter = flow_stat['packet_count']
455
456 counter = 0
457 for flow_stat in flow_stat_dict[str(switch_dpid)]:
458 if 'bytes' in metric_key:
459 counter += flow_stat['byte_count']
460 elif 'packet' in metric_key:
461 counter += flow_stat['packet_count']
462
463 flow_stat = flow_stat_dict[str(switch_dpid)][0]
464 flow_uptime = flow_stat['duration_sec'] + flow_stat['duration_nsec'] * 10 ** (-9)
465
466 self.prom_metrics[metric_dict['metric_key']]. \
467 labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': cookie}). \
468 set(counter)
469 pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)
470
471
472 def start_Prometheus(self, port=9090):
473 # prometheus.yml configuration file is located in the same directory as this file
474 cmd = ["docker",
475 "run",
476 "--rm",
477 "-p", "{0}:9090".format(port),
478 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os.path.dirname(os.path.abspath(__file__))),
479 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os.path.dirname(os.path.abspath(__file__))),
480 "--name", "prometheus",
481 "prom/prometheus"
482 ]
483 logging.info('Start Prometheus container {0}'.format(cmd))
484 return Popen(cmd)
485
486 def start_PushGateway(self, port=9091):
487 cmd = ["docker",
488 "run",
489 "-d",
490 "-p", "{0}:9091".format(port),
491 "--name", "pushgateway",
492 "prom/pushgateway"
493 ]
494
495 logging.info('Start Prometheus Push Gateway container {0}'.format(cmd))
496 return Popen(cmd)
497
498 def start_cadvisor(self, port=8090):
499 cmd = ["docker",
500 "run",
501 "--rm",
502 "--volume=/:/rootfs:ro",
503 "--volume=/var/run:/var/run:rw",
504 "--volume=/sys:/sys:ro",
505 "--volume=/var/lib/docker/:/var/lib/docker:ro",
506 "--publish={0}:8080".format(port),
507 "--name=cadvisor",
508 "google/cadvisor:latest"
509 ]
510 logging.info('Start cAdvisor container {0}'.format(cmd))
511 return Popen(cmd)
512
513 def stop(self):
514 # stop the monitoring thread
515 self.start_monitoring = False
516 self.monitor_thread.join()
517 self.monitor_flow_thread.join()
518
519 # these containers are used for monitoring but are started now outside of son-emu
520 '''
521 if self.prometheus_process is not None:
522 logging.info('stopping prometheus container')
523 self.prometheus_process.terminate()
524 self.prometheus_process.kill()
525 self._stop_container('prometheus')
526
527 if self.pushgateway_process is not None:
528 logging.info('stopping pushgateway container')
529 self.pushgateway_process.terminate()
530 self.pushgateway_process.kill()
531 self._stop_container('pushgateway')
532
533 if self.cadvisor_process is not None:
534 logging.info('stopping cadvisor container')
535 self.cadvisor_process.terminate()
536 self.cadvisor_process.kill()
537 self._stop_container('cadvisor')
538 '''
539
540 def switch_tx_rx(self,metric=''):
541 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
542 # so we need to change the metric name to be consistent with the vnf rx or tx
543 if 'tx' in metric:
544 metric = metric.replace('tx','rx')
545 elif 'rx' in metric:
546 metric = metric.replace('rx','tx')
547
548 return metric
549
550 def _stop_container(self, name):
551 cmd = ["docker",
552 "stop",
553 name]
554 Popen(cmd).wait()
555
556 cmd = ["docker",
557 "rm",
558 name]
559 Popen(cmd).wait()
560