2e1e0418eeb586890c2039845ad8596448b6de3a
2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
31 from mininet
.node
import OVSSwitch
34 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
35 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
37 from subprocess
import Popen
41 logging
.basicConfig(level
=logging
.INFO
)
44 class to read openflow stats from the Ryu controller of the DCNetwork
47 PUSHGATEWAY_PORT
= 9091
50 class DCNetworkMonitor():
51 def __init__(self
, net
):
55 self
.pushgateway
= 'localhost:{0}'.format(PUSHGATEWAY_PORT
)
57 # supported Prometheus metrics
58 self
.registry
= CollectorRegistry()
59 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
60 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
61 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
62 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
63 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
64 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
65 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
66 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
68 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
69 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
71 # list of installed metrics to monitor
72 # each entry can contain this data
78 previous_measurement = 0
79 previous_monitor_time = 0
84 self
.monitor_lock
= threading
.Lock()
85 self
.monitor_flow_lock
= threading
.Lock()
86 self
.network_metrics
= []
87 self
.flow_metrics
= []
89 # start monitoring thread
90 self
.start_monitoring
= True
91 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
92 self
.monitor_thread
.start()
94 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
95 self
.monitor_flow_thread
.start()
98 # cAdvisor, Prometheus pushgateway are started as external container, to gather monitoring metric in son-emu
99 self
.start_PushGateway()
100 self
.start_cAdvisor()
103 # first set some parameters, before measurement can start
104 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
108 # check if port is specified (vnf:port)
109 if vnf_interface
is None:
110 # take first interface by default
111 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
112 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
113 vnf_interface
= link_dict
[0]['src_port_id']
115 flow_metric
['vnf_name'] = vnf_name
116 flow_metric
['vnf_interface'] = vnf_interface
119 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
120 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
121 for link
in link_dict
:
122 if link_dict
[link
]['src_port_id'] == vnf_interface
:
123 # found the right link and connected switch
124 vnf_switch
= connected_sw
125 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
129 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
130 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
133 # default port direction to monitor
135 metric
= 'tx_packets'
137 next_node
= self
.net
.getNodeByName(vnf_switch
)
139 if not isinstance(next_node
, OVSSwitch
):
140 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
143 flow_metric
['previous_measurement'] = 0
144 flow_metric
['previous_monitor_time'] = 0
146 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
147 flow_metric
['metric_key'] = metric
148 flow_metric
['cookie'] = cookie
150 self
.monitor_flow_lock
.acquire()
151 self
.flow_metrics
.append(flow_metric
)
152 self
.monitor_flow_lock
.release()
154 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
155 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
157 except Exception as ex
:
158 logging
.exception("setup_metric error.")
161 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0,):
163 # check if port is specified (vnf:port)
164 if vnf_interface
is None and metric
is not None:
165 # take first interface by default
166 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
167 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
168 vnf_interface
= link_dict
[0]['src_port_id']
170 for flow_dict
in self
.flow_metrics
:
171 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
172 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
174 self
.monitor_flow_lock
.acquire()
176 self
.flow_metrics
.remove(flow_dict
)
178 for collector
in self
.registry
._collectors
:
179 if (vnf_name
, vnf_interface
, cookie
) in collector
._metrics
:
180 collector
.remove(vnf_name
, vnf_interface
, cookie
)
182 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
184 self
.monitor_flow_lock
.release()
186 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
187 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
189 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
192 # first set some parameters, before measurement can start
193 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
197 # check if port is specified (vnf:port)
198 if vnf_interface
is None:
199 # take first interface by default
200 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
201 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
202 vnf_interface
= link_dict
[0]['src_port_id']
204 network_metric
['vnf_name'] = vnf_name
205 network_metric
['vnf_interface'] = vnf_interface
207 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
208 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
209 for link
in link_dict
:
210 if link_dict
[link
]['src_port_id'] == vnf_interface
:
211 # found the right link and connected switch
212 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
215 if 'mon_port' not in network_metric
:
216 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
217 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
220 # default port direction to monitor
222 metric
= 'tx_packets'
224 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
226 if len(vnf_switch
) > 1:
227 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
229 elif len(vnf_switch
) == 0:
230 logging
.info("vnf: {0} is not connected".format(vnf_name
))
233 vnf_switch
= vnf_switch
[0]
234 next_node
= self
.net
.getNodeByName(vnf_switch
)
236 if not isinstance(next_node
, OVSSwitch
):
237 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
240 network_metric
['previous_measurement'] = 0
241 network_metric
['previous_monitor_time'] = 0
244 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
245 network_metric
['metric_key'] = metric
247 self
.monitor_lock
.acquire()
249 self
.network_metrics
.append(network_metric
)
250 self
.monitor_lock
.release()
253 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
254 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
256 except Exception as ex
:
257 logging
.exception("setup_metric error.")
260 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
262 # check if port is specified (vnf:port)
263 if vnf_interface
is None and metric
is not None:
264 # take first interface by default
265 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
266 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
267 vnf_interface
= link_dict
[0]['src_port_id']
269 for metric_dict
in self
.network_metrics
:
270 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
271 and metric_dict
['metric_key'] == metric
:
273 self
.monitor_lock
.acquire()
275 self
.network_metrics
.remove(metric_dict
)
277 #this removes the complete metric, all labels...
278 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
279 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
281 for collector
in self
.registry
._collectors
:
284 INFO:root:name:sonemu_rx_count_packets
285 labels:('vnf_name', 'vnf_interface')
286 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
291 logging
.info('{0}'.format(collector
._metrics
.values()))
293 if (vnf_name
, vnf_interface
, 'None') in collector
._metrics
:
294 logging
.info('2 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
296 collector
.remove(vnf_name
, vnf_interface
, 'None')
298 # set values to NaN, prometheus api currently does not support removal of metrics
299 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
301 # this removes the complete metric, all labels...
302 # 1 single monitor job for all metrics of the SDN controller
303 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
304 # we can not specify labels from the metrics to be removed
305 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
306 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
308 self
.monitor_lock
.release()
310 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
311 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
313 # delete everything from this vnf
314 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
315 self
.monitor_lock
.acquire()
316 self
.network_metrics
.remove(metric_dict
)
317 for collector
in self
.registry
._collectors
:
318 collector_dict
= collector
._metrics
.copy()
319 for name
, interface
, id in collector_dict
:
321 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
323 collector
.remove(name
, interface
, 'None')
325 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
326 self
.monitor_lock
.release()
327 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
328 return 'Stopped monitoring: {0}'.format(vnf_name
)
330 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
336 # get all metrics defined in the list and export it to Prometheus
337 def get_flow_metrics(self
):
338 while self
.start_monitoring
:
340 self
.monitor_flow_lock
.acquire()
342 for flow_dict
in self
.flow_metrics
:
345 data
['cookie'] = flow_dict
['cookie']
346 data
['cookie_mask'] = flow_dict
['cookie']
348 if 'tx' in flow_dict
['metric_key']:
349 data
['match'] = {'in_port':flow_dict
['mon_port']}
350 elif 'rx' in flow_dict
['metric_key']:
351 data
['out_port'] = flow_dict
['mon_port']
355 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
356 if isinstance(ret
, dict):
358 elif isinstance(ret
, basestring
):
359 flow_stat_dict
= ast
.literal_eval(ret
.rstrip())
361 flow_stat_dict
= None
363 logging
.debug('received flow stat:{0} '.format(flow_stat_dict
))
365 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
367 self
.monitor_flow_lock
.release()
370 def get_network_metrics(self
):
371 while self
.start_monitoring
:
373 self
.monitor_lock
.acquire()
375 # group metrics by dpid to optimize the rest api calls
376 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
377 dpid_set
= set(dpid_list
)
379 for dpid
in dpid_set
:
382 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
383 port_stat_dict
= ast
.literal_eval(ret
)
385 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
386 if int(metric_dict
['switch_dpid'])==int(dpid
)]
388 for metric_dict
in metric_list
:
389 self
.set_network_metric(metric_dict
, port_stat_dict
)
391 self
.monitor_lock
.release()
394 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
395 def set_network_metric(self
, metric_dict
, port_stat_dict
):
396 # vnf tx is the datacenter switch rx and vice-versa
397 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
398 switch_dpid
= metric_dict
['switch_dpid']
399 vnf_name
= metric_dict
['vnf_name']
400 vnf_interface
= metric_dict
['vnf_interface']
401 previous_measurement
= metric_dict
['previous_measurement']
402 previous_monitor_time
= metric_dict
['previous_monitor_time']
403 mon_port
= metric_dict
['mon_port']
405 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
406 if int(port_stat
['port_no']) == int(mon_port
):
407 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
408 this_measurement
= int(port_stat
[metric_key
])
410 # set prometheus metric
411 self
.prom_metrics
[metric_dict
['metric_key']].\
412 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': None}).\
413 set(this_measurement
)
415 # 1 single monitor job for all metrics of the SDN controller
416 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
418 # also the rate is calculated here, but not used for now
419 # (rate can be easily queried from prometheus also)
420 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
421 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
422 metric_dict
['previous_monitor_time'] = port_uptime
423 # do first measurement
425 self
.monitor_lock
.release()
427 metric_rate
= self
.get_network_metrics()
431 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
432 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
434 metric_dict
['previous_measurement'] = this_measurement
435 metric_dict
['previous_monitor_time'] = port_uptime
438 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
439 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
441 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
442 # vnf tx is the datacenter switch rx and vice-versa
443 metric_key
= metric_dict
['metric_key']
444 switch_dpid
= metric_dict
['switch_dpid']
445 vnf_name
= metric_dict
['vnf_name']
446 vnf_interface
= metric_dict
['vnf_interface']
447 previous_measurement
= metric_dict
['previous_measurement']
448 previous_monitor_time
= metric_dict
['previous_monitor_time']
449 cookie
= metric_dict
['cookie']
452 for flow_stat
in flow_stat_dict
[str(switch_dpid
)]:
453 if 'bytes' in metric_key
:
454 counter
+= flow_stat
['byte_count']
455 elif 'packet' in metric_key
:
456 counter
+= flow_stat
['packet_count']
458 flow_stat
= flow_stat_dict
[str(switch_dpid
)][0]
459 flow_uptime
= flow_stat
['duration_sec'] + flow_stat
['duration_nsec'] * 10 ** (-9)
461 self
.prom_metrics
[metric_dict
['metric_key']]. \
462 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': cookie
}). \
465 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
467 logging
.warning("Pushgateway not reachable: {0} {1}".format(Exception, e
))
470 def start_Prometheus(self
, port
=CADVISOR_PORT
):
471 # prometheus.yml configuration file is located in the same directory as this file
475 "-p", "{0}:9090".format(port
),
476 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
477 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
478 "--name", "prometheus",
481 logging
.info('Start Prometheus container {0}'.format(cmd
))
484 def start_PushGateway(self
, port
=PUSHGATEWAY_PORT
):
488 "-p", "{0}:9091".format(port
),
489 "--name", "pushgateway",
493 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
496 def start_cAdvisor(self
, port
=8080):
500 "--volume=/:/rootfs:ro",
501 "--volume=/var/run:/var/run:rw",
502 "--volume=/sys:/sys:ro",
503 "--volume=/var/lib/docker/:/var/lib/docker:ro",
504 "--publish={0}:8080".format(port
),
506 "google/cadvisor:latest"
508 logging
.info('Start cAdvisor container {0}'.format(cmd
))
512 # stop the monitoring thread
513 self
.start_monitoring
= False
514 self
.monitor_thread
.join()
515 self
.monitor_flow_thread
.join()
517 # these containers are used for monitoring but are started now outside of son-emu
519 if self.prometheus_process is not None:
520 logging.info('stopping prometheus container')
521 self.prometheus_process.terminate()
522 self.prometheus_process.kill()
523 self._stop_container('prometheus')
525 if self.pushgateway_process is not None:
526 logging.info('stopping pushgateway container')
527 self.pushgateway_process.terminate()
528 self.pushgateway_process.kill()
529 self._stop_container('pushgateway')
531 if self.cadvisor_process is not None:
532 logging.info('stopping cadvisor container')
533 self.cadvisor_process.terminate()
534 self.cadvisor_process.kill()
535 self._stop_container('cadvisor')
538 def switch_tx_rx(self
,metric
=''):
539 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
540 # so we need to change the metric name to be consistent with the vnf rx or tx
542 metric
= metric
.replace('tx','rx')
544 metric
= metric
.replace('rx','tx')
548 def _stop_container(self
, name
):