Merge pull request #163 from stevenvanrossem/master
[osm/vim-emu.git] / src / emuvim / dcemulator / monitoring.py
1 """
2 Copyright (c) 2015 SONATA-NFV
3 ALL RIGHTS RESERVED.
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
20 permission.
21
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
27 """
28
29 import logging
30 import sys
31 from mininet.node import OVSSwitch
32 import ast
33 import time
34 from prometheus_client import start_http_server, Summary, Histogram, Gauge, Counter, REGISTRY, CollectorRegistry, \
35 pushadd_to_gateway, push_to_gateway, delete_from_gateway
36 import threading
37 from subprocess import Popen
38 import os
39
40
41 logging.basicConfig(level=logging.INFO)
42
43 """
44 class to read openflow stats from the Ryu controller of the DCNetwork
45 """
46
47 PUSHGATEWAY_PORT = 9091
48 # we cannot use port 8080 because ryu-ofrest api is already using that one
49 CADVISOR_PORT = 8081
50
51 class DCNetworkMonitor():
52 def __init__(self, net):
53 self.net = net
54
55 # pushgateway address
56 self.pushgateway = 'localhost:{0}'.format(PUSHGATEWAY_PORT)
57
58 # supported Prometheus metrics
59 self.registry = CollectorRegistry()
60 self.prom_tx_packet_count = Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
61 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
62 self.prom_rx_packet_count = Gauge('sonemu_rx_count_packets', 'Total number of packets received',
63 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
64 self.prom_tx_byte_count = Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
65 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
66 self.prom_rx_byte_count = Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
67 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
68
69 self.prom_metrics={'tx_packets':self.prom_tx_packet_count, 'rx_packets':self.prom_rx_packet_count,
70 'tx_bytes':self.prom_tx_byte_count,'rx_bytes':self.prom_rx_byte_count}
71
72 # list of installed metrics to monitor
73 # each entry can contain this data
74 '''
75 {
76 switch_dpid = 0
77 vnf_name = None
78 vnf_interface = None
79 previous_measurement = 0
80 previous_monitor_time = 0
81 metric_key = None
82 mon_port = None
83 }
84 '''
85 self.monitor_lock = threading.Lock()
86 self.monitor_flow_lock = threading.Lock()
87 self.network_metrics = []
88 self.flow_metrics = []
89
90 # start monitoring thread
91 self.start_monitoring = True
92 self.monitor_thread = threading.Thread(target=self.get_network_metrics)
93 self.monitor_thread.start()
94
95 self.monitor_flow_thread = threading.Thread(target=self.get_flow_metrics)
96 self.monitor_flow_thread.start()
97
98 # helper tools
99 # cAdvisor, Prometheus pushgateway are started as external container, to gather monitoring metric in son-emu
100 self.pushgateway_process = self.start_PushGateway()
101 self.cadvisor_process = self.start_cAdvisor()
102
103
104 # first set some parameters, before measurement can start
105 def setup_flow(self, vnf_name, vnf_interface=None, metric='tx_packets', cookie=0):
106
107 flow_metric = {}
108
109 # check if port is specified (vnf:port)
110 if vnf_interface is None:
111 # take first interface by default
112 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
113 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
114 vnf_interface = link_dict[0]['src_port_id']
115
116 flow_metric['vnf_name'] = vnf_name
117 flow_metric['vnf_interface'] = vnf_interface
118
119 vnf_switch = None
120 for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):
121 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
122 for link in link_dict:
123 if link_dict[link]['src_port_id'] == vnf_interface:
124 # found the right link and connected switch
125 vnf_switch = connected_sw
126 flow_metric['mon_port'] = link_dict[link]['dst_port_nr']
127 break
128
129 if not vnf_switch:
130 logging.exception("vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface))
131 return "vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface)
132
133 try:
134 # default port direction to monitor
135 if metric is None:
136 metric = 'tx_packets'
137
138 next_node = self.net.getNodeByName(vnf_switch)
139
140 if not isinstance(next_node, OVSSwitch):
141 logging.info("vnf: {0} is not connected to switch".format(vnf_name))
142 return
143
144 flow_metric['previous_measurement'] = 0
145 flow_metric['previous_monitor_time'] = 0
146
147 flow_metric['switch_dpid'] = int(str(next_node.dpid), 16)
148 flow_metric['metric_key'] = metric
149 flow_metric['cookie'] = cookie
150
151 self.monitor_flow_lock.acquire()
152 self.flow_metrics.append(flow_metric)
153 self.monitor_flow_lock.release()
154
155 logging.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie))
156 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie)
157
158 except Exception as ex:
159 logging.exception("setup_metric error.")
160 return ex.message
161
162 def stop_flow(self, vnf_name, vnf_interface=None, metric=None, cookie=0,):
163
164 # check if port is specified (vnf:port)
165 if vnf_interface is None and metric is not None:
166 # take first interface by default
167 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
168 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
169 vnf_interface = link_dict[0]['src_port_id']
170
171 for flow_dict in self.flow_metrics:
172 if flow_dict['vnf_name'] == vnf_name and flow_dict['vnf_interface'] == vnf_interface \
173 and flow_dict['metric_key'] == metric and flow_dict['cookie'] == cookie:
174
175 self.monitor_flow_lock.acquire()
176
177 self.flow_metrics.remove(flow_dict)
178
179 for collector in self.registry._collectors:
180 if (vnf_name, vnf_interface, cookie) in collector._metrics:
181 collector.remove(vnf_name, vnf_interface, cookie)
182
183 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
184
185 self.monitor_flow_lock.release()
186
187 logging.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie))
188 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie)
189
190 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric, vnf_name, vnf_interface)
191
192
193 # first set some parameters, before measurement can start
194 def setup_metric(self, vnf_name, vnf_interface=None, metric='tx_packets'):
195
196 network_metric = {}
197
198 # check if port is specified (vnf:port)
199 if vnf_interface is None:
200 # take first interface by default
201 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
202 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
203 vnf_interface = link_dict[0]['src_port_id']
204
205 network_metric['vnf_name'] = vnf_name
206 network_metric['vnf_interface'] = vnf_interface
207
208 for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):
209 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
210 for link in link_dict:
211 if link_dict[link]['src_port_id'] == vnf_interface:
212 # found the right link and connected switch
213 network_metric['mon_port'] = link_dict[link]['dst_port_nr']
214 break
215
216 if 'mon_port' not in network_metric:
217 logging.exception("vnf interface {0}:{1} not found!".format(vnf_name,vnf_interface))
218 return "vnf interface {0}:{1} not found!".format(vnf_name,vnf_interface)
219
220 try:
221 # default port direction to monitor
222 if metric is None:
223 metric = 'tx_packets'
224
225 vnf_switch = self.net.DCNetwork_graph.neighbors(str(vnf_name))
226
227 if len(vnf_switch) > 1:
228 logging.info("vnf: {0} has multiple ports".format(vnf_name))
229 return
230 elif len(vnf_switch) == 0:
231 logging.info("vnf: {0} is not connected".format(vnf_name))
232 return
233 else:
234 vnf_switch = vnf_switch[0]
235 next_node = self.net.getNodeByName(vnf_switch)
236
237 if not isinstance(next_node, OVSSwitch):
238 logging.info("vnf: {0} is not connected to switch".format(vnf_name))
239 return
240
241 network_metric['previous_measurement'] = 0
242 network_metric['previous_monitor_time'] = 0
243
244
245 network_metric['switch_dpid'] = int(str(next_node.dpid), 16)
246 network_metric['metric_key'] = metric
247
248 self.monitor_lock.acquire()
249
250 self.network_metrics.append(network_metric)
251 self.monitor_lock.release()
252
253
254 logging.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))
255 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)
256
257 except Exception as ex:
258 logging.exception("setup_metric error.")
259 return ex.message
260
261 def stop_metric(self, vnf_name, vnf_interface=None, metric=None):
262
263 # check if port is specified (vnf:port)
264 if vnf_interface is None and metric is not None:
265 # take first interface by default
266 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
267 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
268 vnf_interface = link_dict[0]['src_port_id']
269
270 for metric_dict in self.network_metrics:
271 if metric_dict['vnf_name'] == vnf_name and metric_dict['vnf_interface'] == vnf_interface \
272 and metric_dict['metric_key'] == metric:
273
274 self.monitor_lock.acquire()
275
276 self.network_metrics.remove(metric_dict)
277
278 #this removes the complete metric, all labels...
279 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
280 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
281
282 for collector in self.registry._collectors :
283
284 """
285 INFO:root:name:sonemu_rx_count_packets
286 labels:('vnf_name', 'vnf_interface')
287 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
288 object
289 at
290 0x7f353447fd10 >}
291 """
292 logging.info('{0}'.format(collector._metrics.values()))
293
294 if (vnf_name, vnf_interface, 'None') in collector._metrics:
295 logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
296 collector._metrics))
297 collector.remove(vnf_name, vnf_interface, 'None')
298
299 # set values to NaN, prometheus api currently does not support removal of metrics
300 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
301
302 # this removes the complete metric, all labels...
303 # 1 single monitor job for all metrics of the SDN controller
304 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
305 # we can not specify labels from the metrics to be removed
306 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
307 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
308
309 self.monitor_lock.release()
310
311 logging.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))
312 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)
313
314 # delete everything from this vnf
315 elif metric_dict['vnf_name'] == vnf_name and vnf_interface is None and metric is None:
316 self.monitor_lock.acquire()
317 self.network_metrics.remove(metric_dict)
318 for collector in self.registry._collectors:
319 collector_dict = collector._metrics.copy()
320 for name, interface, id in collector_dict:
321 if name == vnf_name:
322 logging.info('3 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
323 collector._metrics))
324 collector.remove(name, interface, 'None')
325
326 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
327 self.monitor_lock.release()
328 logging.info('Stopped monitoring vnf: {0}'.format(vnf_name))
329 return 'Stopped monitoring: {0}'.format(vnf_name)
330
331 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric, vnf_name, vnf_interface)
332
333
334
335
336
337 # get all metrics defined in the list and export it to Prometheus
338 def get_flow_metrics(self):
339 while self.start_monitoring:
340
341 self.monitor_flow_lock.acquire()
342
343 for flow_dict in self.flow_metrics:
344 data = {}
345
346 data['cookie'] = flow_dict['cookie']
347 data['cookie_mask'] = flow_dict['cookie']
348
349 if 'tx' in flow_dict['metric_key']:
350 data['match'] = {'in_port':flow_dict['mon_port']}
351 elif 'rx' in flow_dict['metric_key']:
352 data['out_port'] = flow_dict['mon_port']
353
354
355 # query Ryu
356 ret = self.net.ryu_REST('stats/flow', dpid=flow_dict['switch_dpid'], data=data)
357 if isinstance(ret, dict):
358 flow_stat_dict = ret
359 elif isinstance(ret, basestring):
360 flow_stat_dict = ast.literal_eval(ret.rstrip())
361 else:
362 flow_stat_dict = None
363
364 logging.debug('received flow stat:{0} '.format(flow_stat_dict))
365
366 self.set_flow_metric(flow_dict, flow_stat_dict)
367
368 self.monitor_flow_lock.release()
369 time.sleep(1)
370
371 def get_network_metrics(self):
372 while self.start_monitoring:
373
374 self.monitor_lock.acquire()
375
376 # group metrics by dpid to optimize the rest api calls
377 dpid_list = [metric_dict['switch_dpid'] for metric_dict in self.network_metrics]
378 dpid_set = set(dpid_list)
379
380 for dpid in dpid_set:
381
382 # query Ryu
383 ret = self.net.ryu_REST('stats/port', dpid=dpid)
384 port_stat_dict = ast.literal_eval(ret)
385
386 metric_list = [metric_dict for metric_dict in self.network_metrics
387 if int(metric_dict['switch_dpid'])==int(dpid)]
388
389 for metric_dict in metric_list:
390 self.set_network_metric(metric_dict, port_stat_dict)
391
392 self.monitor_lock.release()
393 time.sleep(1)
394
395 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
396 def set_network_metric(self, metric_dict, port_stat_dict):
397 # vnf tx is the datacenter switch rx and vice-versa
398 metric_key = self.switch_tx_rx(metric_dict['metric_key'])
399 switch_dpid = metric_dict['switch_dpid']
400 vnf_name = metric_dict['vnf_name']
401 vnf_interface = metric_dict['vnf_interface']
402 previous_measurement = metric_dict['previous_measurement']
403 previous_monitor_time = metric_dict['previous_monitor_time']
404 mon_port = metric_dict['mon_port']
405
406 for port_stat in port_stat_dict[str(switch_dpid)]:
407 if int(port_stat['port_no']) == int(mon_port):
408 port_uptime = port_stat['duration_sec'] + port_stat['duration_nsec'] * 10 ** (-9)
409 this_measurement = int(port_stat[metric_key])
410
411 # set prometheus metric
412 self.prom_metrics[metric_dict['metric_key']].\
413 labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': None}).\
414 set(this_measurement)
415
416 # 1 single monitor job for all metrics of the SDN controller
417 pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)
418
419 # also the rate is calculated here, but not used for now
420 # (rate can be easily queried from prometheus also)
421 if previous_monitor_time <= 0 or previous_monitor_time >= port_uptime:
422 metric_dict['previous_measurement'] = int(port_stat[metric_key])
423 metric_dict['previous_monitor_time'] = port_uptime
424 # do first measurement
425 time.sleep(1)
426 self.monitor_lock.release()
427
428 metric_rate = self.get_network_metrics()
429 return metric_rate
430
431 else:
432 time_delta = (port_uptime - metric_dict['previous_monitor_time'])
433 metric_rate = (this_measurement - metric_dict['previous_measurement']) / float(time_delta)
434
435 metric_dict['previous_measurement'] = this_measurement
436 metric_dict['previous_monitor_time'] = port_uptime
437 return metric_rate
438
439 logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))
440 return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)
441
442 def set_flow_metric(self, metric_dict, flow_stat_dict):
443 # vnf tx is the datacenter switch rx and vice-versa
444 metric_key = metric_dict['metric_key']
445 switch_dpid = metric_dict['switch_dpid']
446 vnf_name = metric_dict['vnf_name']
447 vnf_interface = metric_dict['vnf_interface']
448 previous_measurement = metric_dict['previous_measurement']
449 previous_monitor_time = metric_dict['previous_monitor_time']
450 cookie = metric_dict['cookie']
451
452 counter = 0
453 for flow_stat in flow_stat_dict[str(switch_dpid)]:
454 if 'bytes' in metric_key:
455 counter += flow_stat['byte_count']
456 elif 'packet' in metric_key:
457 counter += flow_stat['packet_count']
458
459 flow_stat = flow_stat_dict[str(switch_dpid)][0]
460 flow_uptime = flow_stat['duration_sec'] + flow_stat['duration_nsec'] * 10 ** (-9)
461
462 self.prom_metrics[metric_dict['metric_key']]. \
463 labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': cookie}). \
464 set(counter)
465 try:
466 pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)
467 except Exception, e:
468 logging.warning("Pushgateway not reachable: {0} {1}".format(Exception, e))
469
470
471 def start_Prometheus(self, port=9090):
472 # prometheus.yml configuration file is located in the same directory as this file
473 cmd = ["docker",
474 "run",
475 "--rm",
476 "-p", "{0}:9090".format(port),
477 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os.path.dirname(os.path.abspath(__file__))),
478 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os.path.dirname(os.path.abspath(__file__))),
479 "--name", "prometheus",
480 "prom/prometheus"
481 ]
482 logging.info('Start Prometheus container {0}'.format(cmd))
483 return Popen(cmd)
484
485 def start_PushGateway(self, port=PUSHGATEWAY_PORT):
486 cmd = ["docker",
487 "run",
488 "-d",
489 "-p", "{0}:9091".format(port),
490 "--name", "pushgateway",
491 "prom/pushgateway"
492 ]
493
494 logging.info('Start Prometheus Push Gateway container {0}'.format(cmd))
495 return Popen(cmd)
496
497 def start_cAdvisor(self, port=CADVISOR_PORT):
498 cmd = ["docker",
499 "run",
500 "--rm",
501 "--volume=/:/rootfs:ro",
502 "--volume=/var/run:/var/run:rw",
503 "--volume=/sys:/sys:ro",
504 "--volume=/var/lib/docker/:/var/lib/docker:ro",
505 "--publish={0}:8080".format(port),
506 "--name=cadvisor",
507 "google/cadvisor:latest"
508 ]
509 logging.info('Start cAdvisor container {0}'.format(cmd))
510 return Popen(cmd)
511
512 def stop(self):
513 # stop the monitoring thread
514 self.start_monitoring = False
515 self.monitor_thread.join()
516 self.monitor_flow_thread.join()
517
518 # these containers are used for monitoring but are started now outside of son-emu
519 '''
520 if self.prometheus_process is not None:
521 logging.info('stopping prometheus container')
522 self.prometheus_process.terminate()
523 self.prometheus_process.kill()
524 self._stop_container('prometheus')
525 '''
526 if self.pushgateway_process is not None:
527 logging.info('stopping pushgateway container')
528 self.pushgateway_process.terminate()
529 self.pushgateway_process.kill()
530 self._stop_container('pushgateway')
531
532 if self.cadvisor_process is not None:
533 logging.info('stopping cadvisor container')
534 self.cadvisor_process.terminate()
535 self.cadvisor_process.kill()
536 self._stop_container('cadvisor')
537
538 def switch_tx_rx(self,metric=''):
539 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
540 # so we need to change the metric name to be consistent with the vnf rx or tx
541 if 'tx' in metric:
542 metric = metric.replace('tx','rx')
543 elif 'rx' in metric:
544 metric = metric.replace('rx','tx')
545
546 return metric
547
548 def _stop_container(self, name):
549 cmd = ["docker",
550 "stop",
551 name]
552 Popen(cmd).wait()
553
554 cmd = ["docker",
555 "rm",
556 name]
557 Popen(cmd).wait()
558