2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
31 from mininet
.node
import OVSSwitch
34 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
35 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
37 from subprocess
import Popen
41 logging
.basicConfig(level
=logging
.INFO
)
44 class to read openflow stats from the Ryu controller of the DCNetwork
47 PUSHGATEWAY_PORT
= 9091
48 # we cannot use port 8080 because ryu-ofrest api is already using that one
51 class DCNetworkMonitor():
52 def __init__(self
, net
):
56 self
.pushgateway
= 'localhost:{0}'.format(PUSHGATEWAY_PORT
)
58 # supported Prometheus metrics
59 self
.registry
= CollectorRegistry()
60 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
61 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
62 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
63 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
64 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
65 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
66 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
67 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
69 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
70 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
72 # list of installed metrics to monitor
73 # each entry can contain this data
79 previous_measurement = 0
80 previous_monitor_time = 0
85 self
.monitor_lock
= threading
.Lock()
86 self
.monitor_flow_lock
= threading
.Lock()
87 self
.network_metrics
= []
88 self
.flow_metrics
= []
90 # start monitoring thread
91 self
.start_monitoring
= True
92 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
93 self
.monitor_thread
.start()
95 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
96 self
.monitor_flow_thread
.start()
99 # cAdvisor, Prometheus pushgateway are started as external container, to gather monitoring metric in son-emu
100 self
.pushgateway_process
= self
.start_PushGateway()
101 self
.cadvisor_process
= self
.start_cAdvisor()
104 # first set some parameters, before measurement can start
105 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
109 # check if port is specified (vnf:port)
110 if vnf_interface
is None:
111 # take first interface by default
112 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
113 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
114 vnf_interface
= link_dict
[0]['src_port_id']
116 flow_metric
['vnf_name'] = vnf_name
117 flow_metric
['vnf_interface'] = vnf_interface
120 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
121 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
122 for link
in link_dict
:
123 if link_dict
[link
]['src_port_id'] == vnf_interface
:
124 # found the right link and connected switch
125 vnf_switch
= connected_sw
126 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
130 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
131 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
134 # default port direction to monitor
136 metric
= 'tx_packets'
138 next_node
= self
.net
.getNodeByName(vnf_switch
)
140 if not isinstance(next_node
, OVSSwitch
):
141 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
144 flow_metric
['previous_measurement'] = 0
145 flow_metric
['previous_monitor_time'] = 0
147 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
148 flow_metric
['metric_key'] = metric
149 flow_metric
['cookie'] = cookie
151 self
.monitor_flow_lock
.acquire()
152 self
.flow_metrics
.append(flow_metric
)
153 self
.monitor_flow_lock
.release()
155 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
156 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
158 except Exception as ex
:
159 logging
.exception("setup_metric error.")
162 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0,):
164 # check if port is specified (vnf:port)
165 if vnf_interface
is None and metric
is not None:
166 # take first interface by default
167 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
168 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
169 vnf_interface
= link_dict
[0]['src_port_id']
171 for flow_dict
in self
.flow_metrics
:
172 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
173 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
175 self
.monitor_flow_lock
.acquire()
177 self
.flow_metrics
.remove(flow_dict
)
179 for collector
in self
.registry
._collectors
:
180 if (vnf_name
, vnf_interface
, cookie
) in collector
._metrics
:
181 collector
.remove(vnf_name
, vnf_interface
, cookie
)
183 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
185 self
.monitor_flow_lock
.release()
187 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
188 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
190 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
193 # first set some parameters, before measurement can start
194 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
198 # check if port is specified (vnf:port)
199 if vnf_interface
is None:
200 # take first interface by default
201 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
202 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
203 vnf_interface
= link_dict
[0]['src_port_id']
205 network_metric
['vnf_name'] = vnf_name
206 network_metric
['vnf_interface'] = vnf_interface
208 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
209 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
210 for link
in link_dict
:
211 if link_dict
[link
]['src_port_id'] == vnf_interface
:
212 # found the right link and connected switch
213 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
216 if 'mon_port' not in network_metric
:
217 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
218 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
221 # default port direction to monitor
223 metric
= 'tx_packets'
225 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
227 if len(vnf_switch
) > 1:
228 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
230 elif len(vnf_switch
) == 0:
231 logging
.info("vnf: {0} is not connected".format(vnf_name
))
234 vnf_switch
= vnf_switch
[0]
235 next_node
= self
.net
.getNodeByName(vnf_switch
)
237 if not isinstance(next_node
, OVSSwitch
):
238 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
241 network_metric
['previous_measurement'] = 0
242 network_metric
['previous_monitor_time'] = 0
245 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
246 network_metric
['metric_key'] = metric
248 self
.monitor_lock
.acquire()
250 self
.network_metrics
.append(network_metric
)
251 self
.monitor_lock
.release()
254 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
255 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
257 except Exception as ex
:
258 logging
.exception("setup_metric error.")
261 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
263 # check if port is specified (vnf:port)
264 if vnf_interface
is None and metric
is not None:
265 # take first interface by default
266 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
267 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
268 vnf_interface
= link_dict
[0]['src_port_id']
270 for metric_dict
in self
.network_metrics
:
271 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
272 and metric_dict
['metric_key'] == metric
:
274 self
.monitor_lock
.acquire()
276 self
.network_metrics
.remove(metric_dict
)
278 #this removes the complete metric, all labels...
279 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
280 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
282 for collector
in self
.registry
._collectors
:
285 INFO:root:name:sonemu_rx_count_packets
286 labels:('vnf_name', 'vnf_interface')
287 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
292 logging
.info('{0}'.format(collector
._metrics
.values()))
294 if (vnf_name
, vnf_interface
, 'None') in collector
._metrics
:
295 logging
.info('2 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
297 collector
.remove(vnf_name
, vnf_interface
, 'None')
299 # set values to NaN, prometheus api currently does not support removal of metrics
300 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
302 # this removes the complete metric, all labels...
303 # 1 single monitor job for all metrics of the SDN controller
304 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
305 # we can not specify labels from the metrics to be removed
306 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
307 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
309 self
.monitor_lock
.release()
311 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
312 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
314 # delete everything from this vnf
315 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
316 self
.monitor_lock
.acquire()
317 self
.network_metrics
.remove(metric_dict
)
318 for collector
in self
.registry
._collectors
:
319 collector_dict
= collector
._metrics
.copy()
320 for name
, interface
, id in collector_dict
:
322 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
324 collector
.remove(name
, interface
, 'None')
326 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
327 self
.monitor_lock
.release()
328 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
329 return 'Stopped monitoring: {0}'.format(vnf_name
)
331 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
337 # get all metrics defined in the list and export it to Prometheus
338 def get_flow_metrics(self
):
339 while self
.start_monitoring
:
341 self
.monitor_flow_lock
.acquire()
343 for flow_dict
in self
.flow_metrics
:
346 data
['cookie'] = flow_dict
['cookie']
347 data
['cookie_mask'] = flow_dict
['cookie']
349 if 'tx' in flow_dict
['metric_key']:
350 data
['match'] = {'in_port':flow_dict
['mon_port']}
351 elif 'rx' in flow_dict
['metric_key']:
352 data
['out_port'] = flow_dict
['mon_port']
356 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
357 if isinstance(ret
, dict):
359 elif isinstance(ret
, basestring
):
360 flow_stat_dict
= ast
.literal_eval(ret
.rstrip())
362 flow_stat_dict
= None
364 logging
.debug('received flow stat:{0} '.format(flow_stat_dict
))
366 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
368 self
.monitor_flow_lock
.release()
371 def get_network_metrics(self
):
372 while self
.start_monitoring
:
374 self
.monitor_lock
.acquire()
376 # group metrics by dpid to optimize the rest api calls
377 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
378 dpid_set
= set(dpid_list
)
380 for dpid
in dpid_set
:
383 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
384 port_stat_dict
= ast
.literal_eval(ret
)
386 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
387 if int(metric_dict
['switch_dpid'])==int(dpid
)]
389 for metric_dict
in metric_list
:
390 self
.set_network_metric(metric_dict
, port_stat_dict
)
392 self
.monitor_lock
.release()
395 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
396 def set_network_metric(self
, metric_dict
, port_stat_dict
):
397 # vnf tx is the datacenter switch rx and vice-versa
398 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
399 switch_dpid
= metric_dict
['switch_dpid']
400 vnf_name
= metric_dict
['vnf_name']
401 vnf_interface
= metric_dict
['vnf_interface']
402 previous_measurement
= metric_dict
['previous_measurement']
403 previous_monitor_time
= metric_dict
['previous_monitor_time']
404 mon_port
= metric_dict
['mon_port']
406 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
407 if int(port_stat
['port_no']) == int(mon_port
):
408 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
409 this_measurement
= int(port_stat
[metric_key
])
411 # set prometheus metric
412 self
.prom_metrics
[metric_dict
['metric_key']].\
413 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': None}).\
414 set(this_measurement
)
416 # 1 single monitor job for all metrics of the SDN controller
417 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
419 # also the rate is calculated here, but not used for now
420 # (rate can be easily queried from prometheus also)
421 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
422 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
423 metric_dict
['previous_monitor_time'] = port_uptime
424 # do first measurement
426 self
.monitor_lock
.release()
428 metric_rate
= self
.get_network_metrics()
432 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
433 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
435 metric_dict
['previous_measurement'] = this_measurement
436 metric_dict
['previous_monitor_time'] = port_uptime
439 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
440 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
442 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
443 # vnf tx is the datacenter switch rx and vice-versa
444 metric_key
= metric_dict
['metric_key']
445 switch_dpid
= metric_dict
['switch_dpid']
446 vnf_name
= metric_dict
['vnf_name']
447 vnf_interface
= metric_dict
['vnf_interface']
448 previous_measurement
= metric_dict
['previous_measurement']
449 previous_monitor_time
= metric_dict
['previous_monitor_time']
450 cookie
= metric_dict
['cookie']
453 for flow_stat
in flow_stat_dict
[str(switch_dpid
)]:
454 if 'bytes' in metric_key
:
455 counter
+= flow_stat
['byte_count']
456 elif 'packet' in metric_key
:
457 counter
+= flow_stat
['packet_count']
459 flow_stat
= flow_stat_dict
[str(switch_dpid
)][0]
460 flow_uptime
= flow_stat
['duration_sec'] + flow_stat
['duration_nsec'] * 10 ** (-9)
462 self
.prom_metrics
[metric_dict
['metric_key']]. \
463 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': cookie
}). \
466 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
468 logging
.warning("Pushgateway not reachable: {0} {1}".format(Exception, e
))
471 def start_Prometheus(self
, port
=9090):
472 # prometheus.yml configuration file is located in the same directory as this file
476 "-p", "{0}:9090".format(port
),
477 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
478 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
479 "--name", "prometheus",
482 logging
.info('Start Prometheus container {0}'.format(cmd
))
485 def start_PushGateway(self
, port
=PUSHGATEWAY_PORT
):
489 "-p", "{0}:9091".format(port
),
490 "--name", "pushgateway",
494 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
497 def start_cAdvisor(self
, port
=CADVISOR_PORT
):
501 "--volume=/:/rootfs:ro",
502 "--volume=/var/run:/var/run:rw",
503 "--volume=/sys:/sys:ro",
504 "--volume=/var/lib/docker/:/var/lib/docker:ro",
505 "--publish={0}:8080".format(port
),
507 "google/cadvisor:latest"
509 logging
.info('Start cAdvisor container {0}'.format(cmd
))
513 # stop the monitoring thread
514 self
.start_monitoring
= False
515 self
.monitor_thread
.join()
516 self
.monitor_flow_thread
.join()
518 # these containers are used for monitoring but are started now outside of son-emu
520 if self.prometheus_process is not None:
521 logging.info('stopping prometheus container')
522 self.prometheus_process.terminate()
523 self.prometheus_process.kill()
524 self._stop_container('prometheus')
526 if self
.pushgateway_process
is not None:
527 logging
.info('stopping pushgateway container')
528 self
.pushgateway_process
.terminate()
529 self
.pushgateway_process
.kill()
530 self
._stop
_container
('pushgateway')
532 if self
.cadvisor_process
is not None:
533 logging
.info('stopping cadvisor container')
534 self
.cadvisor_process
.terminate()
535 self
.cadvisor_process
.kill()
536 self
._stop
_container
('cadvisor')
538 def switch_tx_rx(self
,metric
=''):
539 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
540 # so we need to change the metric name to be consistent with the vnf rx or tx
542 metric
= metric
.replace('tx','rx')
544 metric
= metric
.replace('rx','tx')
548 def _stop_container(self
, name
):