2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
31 from mininet
.node
import OVSSwitch
34 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
35 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
37 from subprocess
import Popen
41 logging
.basicConfig(level
=logging
.INFO
)
44 class to read openflow stats from the Ryu controller of the DCNetwork
47 PUSHGATEWAY_PORT
= 9091
48 # we cannot use port 8080 because ryu-ofrest api is already using that one
51 COOKIE_MASK
= 0xffffffff
53 class DCNetworkMonitor():
54 def __init__(self
, net
):
58 self
.pushgateway
= 'localhost:{0}'.format(PUSHGATEWAY_PORT
)
60 # supported Prometheus metrics
61 self
.registry
= CollectorRegistry()
62 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
63 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
64 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
65 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
66 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
67 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
68 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
69 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
71 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
72 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
74 # list of installed metrics to monitor
75 # each entry can contain this data
81 previous_measurement = 0
82 previous_monitor_time = 0
87 self
.monitor_lock
= threading
.Lock()
88 self
.monitor_flow_lock
= threading
.Lock()
89 self
.network_metrics
= []
90 self
.flow_metrics
= []
92 # start monitoring thread
93 self
.start_monitoring
= True
94 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
95 self
.monitor_thread
.start()
97 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
98 self
.monitor_flow_thread
.start()
101 # cAdvisor, Prometheus pushgateway are started as external container, to gather monitoring metric in son-emu
102 self
.pushgateway_process
= self
.start_PushGateway()
103 self
.cadvisor_process
= self
.start_cAdvisor()
106 # first set some parameters, before measurement can start
107 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
111 # check if port is specified (vnf:port)
112 if vnf_interface
is None:
113 # take first interface by default
114 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
115 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
116 vnf_interface
= link_dict
[0]['src_port_id']
118 flow_metric
['vnf_name'] = vnf_name
119 flow_metric
['vnf_interface'] = vnf_interface
122 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
123 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
124 for link
in link_dict
:
125 if link_dict
[link
]['src_port_id'] == vnf_interface
:
126 # found the right link and connected switch
127 vnf_switch
= connected_sw
128 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
132 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
133 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
136 # default port direction to monitor
138 metric
= 'tx_packets'
140 next_node
= self
.net
.getNodeByName(vnf_switch
)
142 if not isinstance(next_node
, OVSSwitch
):
143 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
146 flow_metric
['previous_measurement'] = 0
147 flow_metric
['previous_monitor_time'] = 0
149 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
150 flow_metric
['metric_key'] = metric
151 flow_metric
['cookie'] = cookie
153 self
.monitor_flow_lock
.acquire()
154 self
.flow_metrics
.append(flow_metric
)
155 self
.monitor_flow_lock
.release()
157 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
158 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
160 except Exception as ex
:
161 logging
.exception("setup_metric error.")
164 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0,):
166 # check if port is specified (vnf:port)
167 if vnf_interface
is None and metric
is not None:
168 # take first interface by default
169 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
170 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
171 vnf_interface
= link_dict
[0]['src_port_id']
173 for flow_dict
in self
.flow_metrics
:
174 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
175 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
177 self
.monitor_flow_lock
.acquire()
179 self
.flow_metrics
.remove(flow_dict
)
181 for collector
in self
.registry
._collectors
:
182 if (vnf_name
, vnf_interface
, cookie
) in collector
._metrics
:
183 collector
.remove(vnf_name
, vnf_interface
, cookie
)
185 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
187 self
.monitor_flow_lock
.release()
189 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
190 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
192 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
195 # first set some parameters, before measurement can start
196 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
200 # check if port is specified (vnf:port)
201 if vnf_interface
is None:
202 # take first interface by default
203 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
204 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
205 vnf_interface
= link_dict
[0]['src_port_id']
207 network_metric
['vnf_name'] = vnf_name
208 network_metric
['vnf_interface'] = vnf_interface
210 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
211 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
212 for link
in link_dict
:
213 if link_dict
[link
]['src_port_id'] == vnf_interface
:
214 # found the right link and connected switch
215 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
218 if 'mon_port' not in network_metric
:
219 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
220 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
223 # default port direction to monitor
225 metric
= 'tx_packets'
227 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
229 if len(vnf_switch
) > 1:
230 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
232 elif len(vnf_switch
) == 0:
233 logging
.info("vnf: {0} is not connected".format(vnf_name
))
236 vnf_switch
= vnf_switch
[0]
237 next_node
= self
.net
.getNodeByName(vnf_switch
)
239 if not isinstance(next_node
, OVSSwitch
):
240 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
243 network_metric
['previous_measurement'] = 0
244 network_metric
['previous_monitor_time'] = 0
247 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
248 network_metric
['metric_key'] = metric
250 self
.monitor_lock
.acquire()
252 self
.network_metrics
.append(network_metric
)
253 self
.monitor_lock
.release()
256 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
257 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
259 except Exception as ex
:
260 logging
.exception("setup_metric error.")
263 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
265 # check if port is specified (vnf:port)
266 if vnf_interface
is None and metric
is not None:
267 # take first interface by default
268 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
269 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
270 vnf_interface
= link_dict
[0]['src_port_id']
272 for metric_dict
in self
.network_metrics
:
273 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
274 and metric_dict
['metric_key'] == metric
:
276 self
.monitor_lock
.acquire()
278 self
.network_metrics
.remove(metric_dict
)
280 #this removes the complete metric, all labels...
281 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
282 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
284 for collector
in self
.registry
._collectors
:
287 INFO:root:name:sonemu_rx_count_packets
288 labels:('vnf_name', 'vnf_interface')
289 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
294 logging
.info('{0}'.format(collector
._metrics
.values()))
296 if (vnf_name
, vnf_interface
, 'None') in collector
._metrics
:
297 logging
.info('2 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
299 collector
.remove(vnf_name
, vnf_interface
, 'None')
301 # set values to NaN, prometheus api currently does not support removal of metrics
302 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
304 # this removes the complete metric, all labels...
305 # 1 single monitor job for all metrics of the SDN controller
306 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
307 # we can not specify labels from the metrics to be removed
308 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
309 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
311 self
.monitor_lock
.release()
313 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
314 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
316 # delete everything from this vnf
317 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
318 self
.monitor_lock
.acquire()
319 self
.network_metrics
.remove(metric_dict
)
320 for collector
in self
.registry
._collectors
:
321 collector_dict
= collector
._metrics
.copy()
322 for name
, interface
, id in collector_dict
:
324 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
326 collector
.remove(name
, interface
, 'None')
328 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
329 self
.monitor_lock
.release()
330 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
331 return 'Stopped monitoring: {0}'.format(vnf_name
)
333 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
339 # get all metrics defined in the list and export it to Prometheus
340 def get_flow_metrics(self
):
341 while self
.start_monitoring
:
343 self
.monitor_flow_lock
.acquire()
345 for flow_dict
in self
.flow_metrics
:
348 data
['cookie'] = flow_dict
['cookie']
349 data
['cookie_mask'] = COOKIE_MASK
351 if 'tx' in flow_dict
['metric_key']:
352 data
['match'] = {'in_port':flow_dict
['mon_port']}
353 elif 'rx' in flow_dict
['metric_key']:
354 data
['out_port'] = flow_dict
['mon_port']
358 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
359 if isinstance(ret
, dict):
361 elif isinstance(ret
, basestring
):
362 flow_stat_dict
= ast
.literal_eval(ret
.rstrip())
364 flow_stat_dict
= None
366 logging
.debug('received flow stat:{0} '.format(flow_stat_dict
))
368 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
370 self
.monitor_flow_lock
.release()
373 def get_network_metrics(self
):
374 while self
.start_monitoring
:
376 self
.monitor_lock
.acquire()
378 # group metrics by dpid to optimize the rest api calls
379 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
380 dpid_set
= set(dpid_list
)
382 for dpid
in dpid_set
:
385 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
386 if isinstance(ret
, dict):
388 elif isinstance(ret
, basestring
):
389 port_stat_dict
= ast
.literal_eval(ret
.rstrip())
391 port_stat_dict
= None
393 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
394 if int(metric_dict
['switch_dpid'])==int(dpid
)]
396 for metric_dict
in metric_list
:
397 self
.set_network_metric(metric_dict
, port_stat_dict
)
399 self
.monitor_lock
.release()
402 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
403 def set_network_metric(self
, metric_dict
, port_stat_dict
):
404 # vnf tx is the datacenter switch rx and vice-versa
405 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
406 switch_dpid
= metric_dict
['switch_dpid']
407 vnf_name
= metric_dict
['vnf_name']
408 vnf_interface
= metric_dict
['vnf_interface']
409 previous_measurement
= metric_dict
['previous_measurement']
410 previous_monitor_time
= metric_dict
['previous_monitor_time']
411 mon_port
= metric_dict
['mon_port']
413 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
414 if int(port_stat
['port_no']) == int(mon_port
):
415 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
416 this_measurement
= int(port_stat
[metric_key
])
418 # set prometheus metric
419 self
.prom_metrics
[metric_dict
['metric_key']].\
420 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': None}).\
421 set(this_measurement
)
423 # 1 single monitor job for all metrics of the SDN controller
424 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
426 # also the rate is calculated here, but not used for now
427 # (rate can be easily queried from prometheus also)
428 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
429 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
430 metric_dict
['previous_monitor_time'] = port_uptime
431 # do first measurement
433 self
.monitor_lock
.release()
435 metric_rate
= self
.get_network_metrics()
439 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
440 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
442 metric_dict
['previous_measurement'] = this_measurement
443 metric_dict
['previous_monitor_time'] = port_uptime
446 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
447 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
449 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
450 # vnf tx is the datacenter switch rx and vice-versa
451 metric_key
= metric_dict
['metric_key']
452 switch_dpid
= metric_dict
['switch_dpid']
453 vnf_name
= metric_dict
['vnf_name']
454 vnf_interface
= metric_dict
['vnf_interface']
455 previous_measurement
= metric_dict
['previous_measurement']
456 previous_monitor_time
= metric_dict
['previous_monitor_time']
457 cookie
= metric_dict
['cookie']
460 for flow_stat
in flow_stat_dict
[str(switch_dpid
)]:
461 if 'bytes' in metric_key
:
462 counter
+= flow_stat
['byte_count']
463 elif 'packet' in metric_key
:
464 counter
+= flow_stat
['packet_count']
466 flow_stat
= flow_stat_dict
[str(switch_dpid
)][0]
467 flow_uptime
= flow_stat
['duration_sec'] + flow_stat
['duration_nsec'] * 10 ** (-9)
469 self
.prom_metrics
[metric_dict
['metric_key']]. \
470 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': cookie
}). \
473 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
475 logging
.warning("Pushgateway not reachable: {0} {1}".format(Exception, e
))
478 def start_Prometheus(self
, port
=9090):
479 # prometheus.yml configuration file is located in the same directory as this file
483 "-p", "{0}:9090".format(port
),
484 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
485 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
486 "--name", "prometheus",
489 logging
.info('Start Prometheus container {0}'.format(cmd
))
492 def start_PushGateway(self
, port
=PUSHGATEWAY_PORT
):
496 "-p", "{0}:9091".format(port
),
497 "--name", "pushgateway",
501 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
504 def start_cAdvisor(self
, port
=CADVISOR_PORT
):
508 "--volume=/:/rootfs:ro",
509 "--volume=/var/run:/var/run:rw",
510 "--volume=/sys:/sys:ro",
511 "--volume=/var/lib/docker/:/var/lib/docker:ro",
512 "--publish={0}:8080".format(port
),
514 "google/cadvisor:latest"
516 logging
.info('Start cAdvisor container {0}'.format(cmd
))
520 # stop the monitoring thread
521 self
.start_monitoring
= False
522 self
.monitor_thread
.join()
523 self
.monitor_flow_thread
.join()
525 # these containers are used for monitoring but are started now outside of son-emu
527 if self.prometheus_process is not None:
528 logging.info('stopping prometheus container')
529 self.prometheus_process.terminate()
530 self.prometheus_process.kill()
531 self._stop_container('prometheus')
533 if self
.pushgateway_process
is not None:
534 logging
.info('stopping pushgateway container')
535 self
.pushgateway_process
.terminate()
536 self
.pushgateway_process
.kill()
537 self
._stop
_container
('pushgateway')
539 if self
.cadvisor_process
is not None:
540 logging
.info('stopping cadvisor container')
541 self
.cadvisor_process
.terminate()
542 self
.cadvisor_process
.kill()
543 self
._stop
_container
('cadvisor')
545 def switch_tx_rx(self
,metric
=''):
546 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
547 # so we need to change the metric name to be consistent with the vnf rx or tx
549 metric
= metric
.replace('tx','rx')
551 metric
= metric
.replace('rx','tx')
555 def _stop_container(self
, name
):