Merge pull request #169 from stevenvanrossem/master
[osm/vim-emu.git] / src / emuvim / dcemulator / monitoring.py
1 """
2 Copyright (c) 2015 SONATA-NFV
3 ALL RIGHTS RESERVED.
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
20 permission.
21
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
27 """
28
29 import logging
30 import sys
31 from mininet.node import OVSSwitch
32 import ast
33 import time
34 from prometheus_client import start_http_server, Summary, Histogram, Gauge, Counter, REGISTRY, CollectorRegistry, \
35 pushadd_to_gateway, push_to_gateway, delete_from_gateway
36 import threading
37 from subprocess import Popen
38 import os
39
40
41 logging.basicConfig(level=logging.INFO)
42
43 """
44 class to read openflow stats from the Ryu controller of the DCNetwork
45 """
46
47 PUSHGATEWAY_PORT = 9091
48 # we cannot use port 8080 because ryu-ofrest api is already using that one
49 CADVISOR_PORT = 8081
50
51 COOKIE_MASK = 0xffffffff
52
53 class DCNetworkMonitor():
54 def __init__(self, net):
55 self.net = net
56
57 # pushgateway address
58 self.pushgateway = 'localhost:{0}'.format(PUSHGATEWAY_PORT)
59
60 # supported Prometheus metrics
61 self.registry = CollectorRegistry()
62 self.prom_tx_packet_count = Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
63 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
64 self.prom_rx_packet_count = Gauge('sonemu_rx_count_packets', 'Total number of packets received',
65 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
66 self.prom_tx_byte_count = Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
67 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
68 self.prom_rx_byte_count = Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
69 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
70
71 self.prom_metrics={'tx_packets':self.prom_tx_packet_count, 'rx_packets':self.prom_rx_packet_count,
72 'tx_bytes':self.prom_tx_byte_count,'rx_bytes':self.prom_rx_byte_count}
73
74 # list of installed metrics to monitor
75 # each entry can contain this data
76 '''
77 {
78 switch_dpid = 0
79 vnf_name = None
80 vnf_interface = None
81 previous_measurement = 0
82 previous_monitor_time = 0
83 metric_key = None
84 mon_port = None
85 }
86 '''
87 self.monitor_lock = threading.Lock()
88 self.monitor_flow_lock = threading.Lock()
89 self.network_metrics = []
90 self.flow_metrics = []
91
92 # start monitoring thread
93 self.start_monitoring = True
94 self.monitor_thread = threading.Thread(target=self.get_network_metrics)
95 self.monitor_thread.start()
96
97 self.monitor_flow_thread = threading.Thread(target=self.get_flow_metrics)
98 self.monitor_flow_thread.start()
99
100 # helper tools
101 # cAdvisor, Prometheus pushgateway are started as external container, to gather monitoring metric in son-emu
102 self.pushgateway_process = self.start_PushGateway()
103 self.cadvisor_process = self.start_cAdvisor()
104
105
106 # first set some parameters, before measurement can start
107 def setup_flow(self, vnf_name, vnf_interface=None, metric='tx_packets', cookie=0):
108
109 flow_metric = {}
110
111 # check if port is specified (vnf:port)
112 if vnf_interface is None:
113 # take first interface by default
114 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
115 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
116 vnf_interface = link_dict[0]['src_port_id']
117
118 flow_metric['vnf_name'] = vnf_name
119 flow_metric['vnf_interface'] = vnf_interface
120
121 vnf_switch = None
122 for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):
123 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
124 for link in link_dict:
125 if link_dict[link]['src_port_id'] == vnf_interface:
126 # found the right link and connected switch
127 vnf_switch = connected_sw
128 flow_metric['mon_port'] = link_dict[link]['dst_port_nr']
129 break
130
131 if not vnf_switch:
132 logging.exception("vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface))
133 return "vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface)
134
135 try:
136 # default port direction to monitor
137 if metric is None:
138 metric = 'tx_packets'
139
140 next_node = self.net.getNodeByName(vnf_switch)
141
142 if not isinstance(next_node, OVSSwitch):
143 logging.info("vnf: {0} is not connected to switch".format(vnf_name))
144 return
145
146 flow_metric['previous_measurement'] = 0
147 flow_metric['previous_monitor_time'] = 0
148
149 flow_metric['switch_dpid'] = int(str(next_node.dpid), 16)
150 flow_metric['metric_key'] = metric
151 flow_metric['cookie'] = cookie
152
153 self.monitor_flow_lock.acquire()
154 self.flow_metrics.append(flow_metric)
155 self.monitor_flow_lock.release()
156
157 logging.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie))
158 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie)
159
160 except Exception as ex:
161 logging.exception("setup_metric error.")
162 return ex.message
163
164 def stop_flow(self, vnf_name, vnf_interface=None, metric=None, cookie=0,):
165
166 # check if port is specified (vnf:port)
167 if vnf_interface is None and metric is not None:
168 # take first interface by default
169 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
170 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
171 vnf_interface = link_dict[0]['src_port_id']
172
173 for flow_dict in self.flow_metrics:
174 if flow_dict['vnf_name'] == vnf_name and flow_dict['vnf_interface'] == vnf_interface \
175 and flow_dict['metric_key'] == metric and flow_dict['cookie'] == cookie:
176
177 self.monitor_flow_lock.acquire()
178
179 self.flow_metrics.remove(flow_dict)
180
181 for collector in self.registry._collectors:
182 if (vnf_name, vnf_interface, cookie) in collector._metrics:
183 collector.remove(vnf_name, vnf_interface, cookie)
184
185 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
186
187 self.monitor_flow_lock.release()
188
189 logging.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie))
190 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie)
191
192 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric, vnf_name, vnf_interface)
193
194
195 # first set some parameters, before measurement can start
196 def setup_metric(self, vnf_name, vnf_interface=None, metric='tx_packets'):
197
198 network_metric = {}
199
200 # check if port is specified (vnf:port)
201 if vnf_interface is None:
202 # take first interface by default
203 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
204 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
205 vnf_interface = link_dict[0]['src_port_id']
206
207 network_metric['vnf_name'] = vnf_name
208 network_metric['vnf_interface'] = vnf_interface
209
210 for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):
211 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
212 for link in link_dict:
213 if link_dict[link]['src_port_id'] == vnf_interface:
214 # found the right link and connected switch
215 network_metric['mon_port'] = link_dict[link]['dst_port_nr']
216 break
217
218 if 'mon_port' not in network_metric:
219 logging.exception("vnf interface {0}:{1} not found!".format(vnf_name,vnf_interface))
220 return "vnf interface {0}:{1} not found!".format(vnf_name,vnf_interface)
221
222 try:
223 # default port direction to monitor
224 if metric is None:
225 metric = 'tx_packets'
226
227 vnf_switch = self.net.DCNetwork_graph.neighbors(str(vnf_name))
228
229 if len(vnf_switch) > 1:
230 logging.info("vnf: {0} has multiple ports".format(vnf_name))
231 return
232 elif len(vnf_switch) == 0:
233 logging.info("vnf: {0} is not connected".format(vnf_name))
234 return
235 else:
236 vnf_switch = vnf_switch[0]
237 next_node = self.net.getNodeByName(vnf_switch)
238
239 if not isinstance(next_node, OVSSwitch):
240 logging.info("vnf: {0} is not connected to switch".format(vnf_name))
241 return
242
243 network_metric['previous_measurement'] = 0
244 network_metric['previous_monitor_time'] = 0
245
246
247 network_metric['switch_dpid'] = int(str(next_node.dpid), 16)
248 network_metric['metric_key'] = metric
249
250 self.monitor_lock.acquire()
251
252 self.network_metrics.append(network_metric)
253 self.monitor_lock.release()
254
255
256 logging.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))
257 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)
258
259 except Exception as ex:
260 logging.exception("setup_metric error.")
261 return ex.message
262
263 def stop_metric(self, vnf_name, vnf_interface=None, metric=None):
264
265 # check if port is specified (vnf:port)
266 if vnf_interface is None and metric is not None:
267 # take first interface by default
268 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
269 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
270 vnf_interface = link_dict[0]['src_port_id']
271
272 for metric_dict in self.network_metrics:
273 if metric_dict['vnf_name'] == vnf_name and metric_dict['vnf_interface'] == vnf_interface \
274 and metric_dict['metric_key'] == metric:
275
276 self.monitor_lock.acquire()
277
278 self.network_metrics.remove(metric_dict)
279
280 #this removes the complete metric, all labels...
281 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
282 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
283
284 for collector in self.registry._collectors :
285
286 """
287 INFO:root:name:sonemu_rx_count_packets
288 labels:('vnf_name', 'vnf_interface')
289 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
290 object
291 at
292 0x7f353447fd10 >}
293 """
294 logging.info('{0}'.format(collector._metrics.values()))
295
296 if (vnf_name, vnf_interface, 'None') in collector._metrics:
297 logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
298 collector._metrics))
299 collector.remove(vnf_name, vnf_interface, 'None')
300
301 # set values to NaN, prometheus api currently does not support removal of metrics
302 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
303
304 # this removes the complete metric, all labels...
305 # 1 single monitor job for all metrics of the SDN controller
306 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
307 # we can not specify labels from the metrics to be removed
308 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
309 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
310
311 self.monitor_lock.release()
312
313 logging.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))
314 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)
315
316 # delete everything from this vnf
317 elif metric_dict['vnf_name'] == vnf_name and vnf_interface is None and metric is None:
318 self.monitor_lock.acquire()
319 self.network_metrics.remove(metric_dict)
320 for collector in self.registry._collectors:
321 collector_dict = collector._metrics.copy()
322 for name, interface, id in collector_dict:
323 if name == vnf_name:
324 logging.info('3 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
325 collector._metrics))
326 collector.remove(name, interface, 'None')
327
328 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
329 self.monitor_lock.release()
330 logging.info('Stopped monitoring vnf: {0}'.format(vnf_name))
331 return 'Stopped monitoring: {0}'.format(vnf_name)
332
333 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric, vnf_name, vnf_interface)
334
335
336
337
338
339 # get all metrics defined in the list and export it to Prometheus
340 def get_flow_metrics(self):
341 while self.start_monitoring:
342
343 self.monitor_flow_lock.acquire()
344
345 for flow_dict in self.flow_metrics:
346 data = {}
347
348 data['cookie'] = flow_dict['cookie']
349 data['cookie_mask'] = COOKIE_MASK
350
351 if 'tx' in flow_dict['metric_key']:
352 data['match'] = {'in_port':flow_dict['mon_port']}
353 elif 'rx' in flow_dict['metric_key']:
354 data['out_port'] = flow_dict['mon_port']
355
356
357 # query Ryu
358 ret = self.net.ryu_REST('stats/flow', dpid=flow_dict['switch_dpid'], data=data)
359 if isinstance(ret, dict):
360 flow_stat_dict = ret
361 elif isinstance(ret, basestring):
362 flow_stat_dict = ast.literal_eval(ret.rstrip())
363 else:
364 flow_stat_dict = None
365
366 logging.debug('received flow stat:{0} '.format(flow_stat_dict))
367
368 self.set_flow_metric(flow_dict, flow_stat_dict)
369
370 self.monitor_flow_lock.release()
371 time.sleep(1)
372
373 def get_network_metrics(self):
374 while self.start_monitoring:
375
376 self.monitor_lock.acquire()
377
378 # group metrics by dpid to optimize the rest api calls
379 dpid_list = [metric_dict['switch_dpid'] for metric_dict in self.network_metrics]
380 dpid_set = set(dpid_list)
381
382 for dpid in dpid_set:
383
384 # query Ryu
385 ret = self.net.ryu_REST('stats/port', dpid=dpid)
386 if isinstance(ret, dict):
387 port_stat_dict = ret
388 elif isinstance(ret, basestring):
389 port_stat_dict = ast.literal_eval(ret.rstrip())
390 else:
391 port_stat_dict = None
392
393 metric_list = [metric_dict for metric_dict in self.network_metrics
394 if int(metric_dict['switch_dpid'])==int(dpid)]
395
396 for metric_dict in metric_list:
397 self.set_network_metric(metric_dict, port_stat_dict)
398
399 self.monitor_lock.release()
400 time.sleep(1)
401
402 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
403 def set_network_metric(self, metric_dict, port_stat_dict):
404 # vnf tx is the datacenter switch rx and vice-versa
405 metric_key = self.switch_tx_rx(metric_dict['metric_key'])
406 switch_dpid = metric_dict['switch_dpid']
407 vnf_name = metric_dict['vnf_name']
408 vnf_interface = metric_dict['vnf_interface']
409 previous_measurement = metric_dict['previous_measurement']
410 previous_monitor_time = metric_dict['previous_monitor_time']
411 mon_port = metric_dict['mon_port']
412
413 for port_stat in port_stat_dict[str(switch_dpid)]:
414 if int(port_stat['port_no']) == int(mon_port):
415 port_uptime = port_stat['duration_sec'] + port_stat['duration_nsec'] * 10 ** (-9)
416 this_measurement = int(port_stat[metric_key])
417
418 # set prometheus metric
419 self.prom_metrics[metric_dict['metric_key']].\
420 labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': None}).\
421 set(this_measurement)
422
423 # 1 single monitor job for all metrics of the SDN controller
424 pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)
425
426 # also the rate is calculated here, but not used for now
427 # (rate can be easily queried from prometheus also)
428 if previous_monitor_time <= 0 or previous_monitor_time >= port_uptime:
429 metric_dict['previous_measurement'] = int(port_stat[metric_key])
430 metric_dict['previous_monitor_time'] = port_uptime
431 # do first measurement
432 time.sleep(1)
433 self.monitor_lock.release()
434
435 metric_rate = self.get_network_metrics()
436 return metric_rate
437
438 else:
439 time_delta = (port_uptime - metric_dict['previous_monitor_time'])
440 metric_rate = (this_measurement - metric_dict['previous_measurement']) / float(time_delta)
441
442 metric_dict['previous_measurement'] = this_measurement
443 metric_dict['previous_monitor_time'] = port_uptime
444 return metric_rate
445
446 logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))
447 return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)
448
449 def set_flow_metric(self, metric_dict, flow_stat_dict):
450 # vnf tx is the datacenter switch rx and vice-versa
451 metric_key = metric_dict['metric_key']
452 switch_dpid = metric_dict['switch_dpid']
453 vnf_name = metric_dict['vnf_name']
454 vnf_interface = metric_dict['vnf_interface']
455 previous_measurement = metric_dict['previous_measurement']
456 previous_monitor_time = metric_dict['previous_monitor_time']
457 cookie = metric_dict['cookie']
458
459 counter = 0
460 for flow_stat in flow_stat_dict[str(switch_dpid)]:
461 if 'bytes' in metric_key:
462 counter += flow_stat['byte_count']
463 elif 'packet' in metric_key:
464 counter += flow_stat['packet_count']
465
466 flow_stat = flow_stat_dict[str(switch_dpid)][0]
467 flow_uptime = flow_stat['duration_sec'] + flow_stat['duration_nsec'] * 10 ** (-9)
468
469 self.prom_metrics[metric_dict['metric_key']]. \
470 labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': cookie}). \
471 set(counter)
472 try:
473 pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)
474 except Exception, e:
475 logging.warning("Pushgateway not reachable: {0} {1}".format(Exception, e))
476
477
478 def start_Prometheus(self, port=9090):
479 # prometheus.yml configuration file is located in the same directory as this file
480 cmd = ["docker",
481 "run",
482 "--rm",
483 "-p", "{0}:9090".format(port),
484 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os.path.dirname(os.path.abspath(__file__))),
485 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os.path.dirname(os.path.abspath(__file__))),
486 "--name", "prometheus",
487 "prom/prometheus"
488 ]
489 logging.info('Start Prometheus container {0}'.format(cmd))
490 return Popen(cmd)
491
492 def start_PushGateway(self, port=PUSHGATEWAY_PORT):
493 cmd = ["docker",
494 "run",
495 "-d",
496 "-p", "{0}:9091".format(port),
497 "--name", "pushgateway",
498 "prom/pushgateway"
499 ]
500
501 logging.info('Start Prometheus Push Gateway container {0}'.format(cmd))
502 return Popen(cmd)
503
504 def start_cAdvisor(self, port=CADVISOR_PORT):
505 cmd = ["docker",
506 "run",
507 "--rm",
508 "--volume=/:/rootfs:ro",
509 "--volume=/var/run:/var/run:rw",
510 "--volume=/sys:/sys:ro",
511 "--volume=/var/lib/docker/:/var/lib/docker:ro",
512 "--publish={0}:8080".format(port),
513 "--name=cadvisor",
514 "google/cadvisor:latest"
515 ]
516 logging.info('Start cAdvisor container {0}'.format(cmd))
517 return Popen(cmd)
518
519 def stop(self):
520 # stop the monitoring thread
521 self.start_monitoring = False
522 self.monitor_thread.join()
523 self.monitor_flow_thread.join()
524
525 # these containers are used for monitoring but are started now outside of son-emu
526 '''
527 if self.prometheus_process is not None:
528 logging.info('stopping prometheus container')
529 self.prometheus_process.terminate()
530 self.prometheus_process.kill()
531 self._stop_container('prometheus')
532 '''
533 if self.pushgateway_process is not None:
534 logging.info('stopping pushgateway container')
535 self.pushgateway_process.terminate()
536 self.pushgateway_process.kill()
537 self._stop_container('pushgateway')
538
539 if self.cadvisor_process is not None:
540 logging.info('stopping cadvisor container')
541 self.cadvisor_process.terminate()
542 self.cadvisor_process.kill()
543 self._stop_container('cadvisor')
544
545 def switch_tx_rx(self,metric=''):
546 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
547 # so we need to change the metric name to be consistent with the vnf rx or tx
548 if 'tx' in metric:
549 metric = metric.replace('tx','rx')
550 elif 'rx' in metric:
551 metric = metric.replace('rx','tx')
552
553 return metric
554
555 def _stop_container(self, name):
556 cmd = ["docker",
557 "stop",
558 name]
559 Popen(cmd).wait()
560
561 cmd = ["docker",
562 "rm",
563 name]
564 Popen(cmd).wait()
565