2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
31 from mininet
.node
import OVSSwitch
34 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
35 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
37 from subprocess
import Popen
41 logging
.basicConfig(level
=logging
.INFO
)
44 class to read openflow stats from the Ryu controller of the DCNetwork
47 class DCNetworkMonitor():
48 def __init__(self
, net
):
51 # TODO: these global variables should be part of a config file?
53 # prometheus is started outside of son-emu
54 prometheus_ip = '127.0.0.1'
55 prometheus_port = '9090'
56 self.prometheus_REST_api = 'http://{0}:{1}'.format(prometheus_ip, prometheus_port)
58 # helper variables to calculate the metrics
59 # pushgateway is started outside of son-emu and son-emu is started with net=host
60 # so localhost:9091 works
61 self
.pushgateway
= 'localhost:9091'
62 # when sdk is started with docker-compose, we could use
63 # self.pushgateway = 'pushgateway:9091'
65 # supported Prometheus metrics
66 self
.registry
= CollectorRegistry()
67 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
68 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
69 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
70 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
71 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
72 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
73 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
74 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
76 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
77 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
79 # list of installed metrics to monitor
80 # each entry can contain this data
86 previous_measurement = 0
87 previous_monitor_time = 0
92 self
.monitor_lock
= threading
.Lock()
93 self
.monitor_flow_lock
= threading
.Lock()
94 self
.network_metrics
= []
95 self
.flow_metrics
= []
97 # start monitoring thread
98 self
.start_monitoring
= True
99 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
100 self
.monitor_thread
.start()
102 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
103 self
.monitor_flow_thread
.start()
106 # cAdvisor, Prometheus pushgateway and DB are started as external container, outside of son-emu
109 # first set some parameters, before measurement can start
110 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
114 # check if port is specified (vnf:port)
115 if vnf_interface
is None:
116 # take first interface by default
117 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
118 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
119 vnf_interface
= link_dict
[0]['src_port_id']
121 flow_metric
['vnf_name'] = vnf_name
122 flow_metric
['vnf_interface'] = vnf_interface
125 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
126 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
127 for link
in link_dict
:
128 if link_dict
[link
]['src_port_id'] == vnf_interface
:
129 # found the right link and connected switch
130 vnf_switch
= connected_sw
131 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
135 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
136 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
139 # default port direction to monitor
141 metric
= 'tx_packets'
143 next_node
= self
.net
.getNodeByName(vnf_switch
)
145 if not isinstance(next_node
, OVSSwitch
):
146 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
149 flow_metric
['previous_measurement'] = 0
150 flow_metric
['previous_monitor_time'] = 0
152 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
153 flow_metric
['metric_key'] = metric
154 flow_metric
['cookie'] = cookie
156 self
.monitor_flow_lock
.acquire()
157 self
.flow_metrics
.append(flow_metric
)
158 self
.monitor_flow_lock
.release()
160 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
161 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
163 except Exception as ex
:
164 logging
.exception("setup_metric error.")
167 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0,):
169 # check if port is specified (vnf:port)
170 if vnf_interface
is None and metric
is not None:
171 # take first interface by default
172 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
173 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
174 vnf_interface
= link_dict
[0]['src_port_id']
176 for flow_dict
in self
.flow_metrics
:
177 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
178 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
180 self
.monitor_flow_lock
.acquire()
182 self
.flow_metrics
.remove(flow_dict
)
184 for collector
in self
.registry
._collectors
:
185 if (vnf_name
, vnf_interface
, cookie
) in collector
._metrics
:
186 collector
.remove(vnf_name
, vnf_interface
, cookie
)
188 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
190 self
.monitor_flow_lock
.release()
192 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
193 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
195 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
198 # first set some parameters, before measurement can start
199 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
203 # check if port is specified (vnf:port)
204 if vnf_interface
is None:
205 # take first interface by default
206 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
207 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
208 vnf_interface
= link_dict
[0]['src_port_id']
210 network_metric
['vnf_name'] = vnf_name
211 network_metric
['vnf_interface'] = vnf_interface
213 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
214 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
215 for link
in link_dict
:
216 if link_dict
[link
]['src_port_id'] == vnf_interface
:
217 # found the right link and connected switch
218 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
221 if 'mon_port' not in network_metric
:
222 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
223 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
226 # default port direction to monitor
228 metric
= 'tx_packets'
230 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
232 if len(vnf_switch
) > 1:
233 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
235 elif len(vnf_switch
) == 0:
236 logging
.info("vnf: {0} is not connected".format(vnf_name
))
239 vnf_switch
= vnf_switch
[0]
240 next_node
= self
.net
.getNodeByName(vnf_switch
)
242 if not isinstance(next_node
, OVSSwitch
):
243 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
246 network_metric
['previous_measurement'] = 0
247 network_metric
['previous_monitor_time'] = 0
250 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
251 network_metric
['metric_key'] = metric
253 self
.monitor_lock
.acquire()
255 self
.network_metrics
.append(network_metric
)
256 self
.monitor_lock
.release()
259 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
260 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
262 except Exception as ex
:
263 logging
.exception("setup_metric error.")
266 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
268 # check if port is specified (vnf:port)
269 if vnf_interface
is None and metric
is not None:
270 # take first interface by default
271 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
272 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
273 vnf_interface
= link_dict
[0]['src_port_id']
275 for metric_dict
in self
.network_metrics
:
276 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
277 and metric_dict
['metric_key'] == metric
:
279 self
.monitor_lock
.acquire()
281 self
.network_metrics
.remove(metric_dict
)
283 #this removes the complete metric, all labels...
284 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
285 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
287 for collector
in self
.registry
._collectors
:
290 INFO:root:name:sonemu_rx_count_packets
291 labels:('vnf_name', 'vnf_interface')
292 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
297 logging
.info('{0}'.format(collector
._metrics
.values()))
299 if (vnf_name
, vnf_interface
, 'None') in collector
._metrics
:
300 logging
.info('2 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
302 collector
.remove(vnf_name
, vnf_interface
, 'None')
304 # set values to NaN, prometheus api currently does not support removal of metrics
305 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
307 # this removes the complete metric, all labels...
308 # 1 single monitor job for all metrics of the SDN controller
309 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
310 # we can not specify labels from the metrics to be removed
311 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
312 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
314 self
.monitor_lock
.release()
316 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
317 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
319 # delete everything from this vnf
320 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
321 self
.monitor_lock
.acquire()
322 self
.network_metrics
.remove(metric_dict
)
323 for collector
in self
.registry
._collectors
:
324 collector_dict
= collector
._metrics
.copy()
325 for name
, interface
, id in collector_dict
:
327 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
329 collector
.remove(name
, interface
, 'None')
331 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
332 self
.monitor_lock
.release()
333 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
334 return 'Stopped monitoring: {0}'.format(vnf_name
)
336 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
342 # get all metrics defined in the list and export it to Prometheus
343 def get_flow_metrics(self
):
344 while self
.start_monitoring
:
346 self
.monitor_flow_lock
.acquire()
348 for flow_dict
in self
.flow_metrics
:
351 data
['cookie'] = flow_dict
['cookie']
352 data
['cookie_mask'] = flow_dict
['cookie']
354 if 'tx' in flow_dict
['metric_key']:
355 data
['match'] = {'in_port':flow_dict
['mon_port']}
356 elif 'rx' in flow_dict
['metric_key']:
357 data
['out_port'] = flow_dict
['mon_port']
361 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
362 if isinstance(ret
, dict):
364 elif isinstance(ret
, basestring
):
365 flow_stat_dict
= ast
.literal_eval(ret
.rstrip())
367 flow_stat_dict
= None
369 logging
.debug('received flow stat:{0} '.format(flow_stat_dict
))
371 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
373 self
.monitor_flow_lock
.release()
376 def get_network_metrics(self
):
377 while self
.start_monitoring
:
379 self
.monitor_lock
.acquire()
381 # group metrics by dpid to optimize the rest api calls
382 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
383 dpid_set
= set(dpid_list
)
385 for dpid
in dpid_set
:
388 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
389 port_stat_dict
= ast
.literal_eval(ret
)
391 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
392 if int(metric_dict
['switch_dpid'])==int(dpid
)]
394 for metric_dict
in metric_list
:
395 self
.set_network_metric(metric_dict
, port_stat_dict
)
397 self
.monitor_lock
.release()
400 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
401 def set_network_metric(self
, metric_dict
, port_stat_dict
):
402 # vnf tx is the datacenter switch rx and vice-versa
403 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
404 switch_dpid
= metric_dict
['switch_dpid']
405 vnf_name
= metric_dict
['vnf_name']
406 vnf_interface
= metric_dict
['vnf_interface']
407 previous_measurement
= metric_dict
['previous_measurement']
408 previous_monitor_time
= metric_dict
['previous_monitor_time']
409 mon_port
= metric_dict
['mon_port']
411 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
412 if int(port_stat
['port_no']) == int(mon_port
):
413 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
414 this_measurement
= int(port_stat
[metric_key
])
416 # set prometheus metric
417 self
.prom_metrics
[metric_dict
['metric_key']].\
418 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': None}).\
419 set(this_measurement
)
421 # 1 single monitor job for all metrics of the SDN controller
422 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
424 # also the rate is calculated here, but not used for now
425 # (rate can be easily queried from prometheus also)
426 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
427 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
428 metric_dict
['previous_monitor_time'] = port_uptime
429 # do first measurement
431 self
.monitor_lock
.release()
433 metric_rate
= self
.get_network_metrics()
437 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
438 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
440 metric_dict
['previous_measurement'] = this_measurement
441 metric_dict
['previous_monitor_time'] = port_uptime
444 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
445 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
447 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
448 # vnf tx is the datacenter switch rx and vice-versa
449 metric_key
= metric_dict
['metric_key']
450 switch_dpid
= metric_dict
['switch_dpid']
451 vnf_name
= metric_dict
['vnf_name']
452 vnf_interface
= metric_dict
['vnf_interface']
453 previous_measurement
= metric_dict
['previous_measurement']
454 previous_monitor_time
= metric_dict
['previous_monitor_time']
455 cookie
= metric_dict
['cookie']
458 for flow_stat
in flow_stat_dict
[str(switch_dpid
)]:
459 if 'bytes' in metric_key
:
460 counter
+= flow_stat
['byte_count']
461 elif 'packet' in metric_key
:
462 counter
+= flow_stat
['packet_count']
464 flow_stat
= flow_stat_dict
[str(switch_dpid
)][0]
465 flow_uptime
= flow_stat
['duration_sec'] + flow_stat
['duration_nsec'] * 10 ** (-9)
467 self
.prom_metrics
[metric_dict
['metric_key']]. \
468 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': cookie
}). \
471 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
473 logging
.warning("Pushgateway not reachable: {0} {1}".format(Exception, e
))
476 def start_Prometheus(self
, port
=9090):
477 # prometheus.yml configuration file is located in the same directory as this file
481 "-p", "{0}:9090".format(port
),
482 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
483 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
484 "--name", "prometheus",
487 logging
.info('Start Prometheus container {0}'.format(cmd
))
490 def start_PushGateway(self
, port
=9091):
494 "-p", "{0}:9091".format(port
),
495 "--name", "pushgateway",
499 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
502 def start_cadvisor(self
, port
=8090):
506 "--volume=/:/rootfs:ro",
507 "--volume=/var/run:/var/run:rw",
508 "--volume=/sys:/sys:ro",
509 "--volume=/var/lib/docker/:/var/lib/docker:ro",
510 "--publish={0}:8080".format(port
),
512 "google/cadvisor:latest"
514 logging
.info('Start cAdvisor container {0}'.format(cmd
))
518 # stop the monitoring thread
519 self
.start_monitoring
= False
520 self
.monitor_thread
.join()
521 self
.monitor_flow_thread
.join()
523 # these containers are used for monitoring but are started now outside of son-emu
525 if self.prometheus_process is not None:
526 logging.info('stopping prometheus container')
527 self.prometheus_process.terminate()
528 self.prometheus_process.kill()
529 self._stop_container('prometheus')
531 if self.pushgateway_process is not None:
532 logging.info('stopping pushgateway container')
533 self.pushgateway_process.terminate()
534 self.pushgateway_process.kill()
535 self._stop_container('pushgateway')
537 if self.cadvisor_process is not None:
538 logging.info('stopping cadvisor container')
539 self.cadvisor_process.terminate()
540 self.cadvisor_process.kill()
541 self._stop_container('cadvisor')
544 def switch_tx_rx(self
,metric
=''):
545 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
546 # so we need to change the metric name to be consistent with the vnf rx or tx
548 metric
= metric
.replace('tx','rx')
550 metric
= metric
.replace('rx','tx')
554 def _stop_container(self
, name
):