2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
30 from mininet
.node
import OVSSwitch
33 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
34 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
36 from subprocess
import Popen
40 logging
.basicConfig(level
=logging
.INFO
)
43 class to read openflow stats from the Ryu controller of the DCNetwork
46 class DCNetworkMonitor():
47 def __init__(self
, net
):
50 # TODO: these global variables should be part of a config file?
52 # prometheus is started outside of son-emu
53 prometheus_ip = '127.0.0.1'
54 prometheus_port = '9090'
55 self.prometheus_REST_api = 'http://{0}:{1}'.format(prometheus_ip, prometheus_port)
57 # helper variables to calculate the metrics
58 # pushgateway is started outside of son-emu and son-emu is started with net=host
59 # so localhost:9091 works
60 self
.pushgateway
= 'localhost:9091'
61 # when sdk is started with docker-compose, we could use
62 # self.pushgateway = 'pushgateway:9091'
63 # Start up the server to expose the metrics to Prometheus
64 #start_http_server(8000)
66 # supported Prometheus metrics
67 self
.registry
= CollectorRegistry()
68 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
69 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
70 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
71 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
72 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
73 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
74 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
75 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
77 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
78 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
80 # list of installed metrics to monitor
81 # each entry can contain this data
87 previous_measurement = 0
88 previous_monitor_time = 0
93 self
.monitor_lock
= threading
.Lock()
94 self
.monitor_flow_lock
= threading
.Lock()
95 self
.network_metrics
= []
96 self
.flow_metrics
= []
98 # start monitoring thread
99 self
.start_monitoring
= True
100 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
101 self
.monitor_thread
.start()
103 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
104 self
.monitor_flow_thread
.start()
107 # Prometheus pushgateway and DB are started as external contianer, outside of son-emu
108 #self.pushgateway_process = self.start_PushGateway()
109 #self.prometheus_process = self.start_Prometheus()
110 self
.cadvisor_process
= self
.start_cadvisor()
112 # first set some parameters, before measurement can start
113 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
117 # check if port is specified (vnf:port)
118 if vnf_interface
is None:
119 # take first interface by default
120 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
121 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
122 vnf_interface
= link_dict
[0]['src_port_id']
124 flow_metric
['vnf_name'] = vnf_name
125 flow_metric
['vnf_interface'] = vnf_interface
128 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
129 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
130 for link
in link_dict
:
131 if link_dict
[link
]['src_port_id'] == vnf_interface
:
132 # found the right link and connected switch
133 vnf_switch
= connected_sw
134 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
138 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
139 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
142 # default port direction to monitor
144 metric
= 'tx_packets'
146 next_node
= self
.net
.getNodeByName(vnf_switch
)
148 if not isinstance(next_node
, OVSSwitch
):
149 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
152 flow_metric
['previous_measurement'] = 0
153 flow_metric
['previous_monitor_time'] = 0
155 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
156 flow_metric
['metric_key'] = metric
157 flow_metric
['cookie'] = cookie
159 self
.monitor_flow_lock
.acquire()
160 self
.flow_metrics
.append(flow_metric
)
161 self
.monitor_flow_lock
.release()
163 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
164 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
166 except Exception as ex
:
167 logging
.exception("setup_metric error.")
170 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0):
171 for flow_dict
in self
.flow_metrics
:
172 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
173 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
175 self
.monitor_flow_lock
.acquire()
177 self
.flow_metrics
.remove(flow_dict
)
179 for collector
in self
.registry
._collectors
:
180 if (vnf_name
, vnf_interface
, cookie
) in collector
._metrics
:
181 collector
.remove(vnf_name
, vnf_interface
, cookie
)
183 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
185 self
.monitor_flow_lock
.release()
187 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
188 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
191 # first set some parameters, before measurement can start
192 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
196 # check if port is specified (vnf:port)
197 if vnf_interface
is None:
198 # take first interface by default
199 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
200 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
201 vnf_interface
= link_dict
[0]['src_port_id']
203 network_metric
['vnf_name'] = vnf_name
204 network_metric
['vnf_interface'] = vnf_interface
206 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
207 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
208 for link
in link_dict
:
209 if link_dict
[link
]['src_port_id'] == vnf_interface
:
210 # found the right link and connected switch
211 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
214 if 'mon_port' not in network_metric
:
215 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
216 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
219 # default port direction to monitor
221 metric
= 'tx_packets'
223 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
225 if len(vnf_switch
) > 1:
226 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
228 elif len(vnf_switch
) == 0:
229 logging
.info("vnf: {0} is not connected".format(vnf_name
))
232 vnf_switch
= vnf_switch
[0]
233 next_node
= self
.net
.getNodeByName(vnf_switch
)
235 if not isinstance(next_node
, OVSSwitch
):
236 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
239 network_metric
['previous_measurement'] = 0
240 network_metric
['previous_monitor_time'] = 0
243 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
244 network_metric
['metric_key'] = metric
246 self
.monitor_lock
.acquire()
248 self
.network_metrics
.append(network_metric
)
249 self
.monitor_lock
.release()
252 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
253 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
255 except Exception as ex
:
256 logging
.exception("setup_metric error.")
259 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
261 for metric_dict
in self
.network_metrics
:
262 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
263 and metric_dict
['metric_key'] == metric
:
265 self
.monitor_lock
.acquire()
267 self
.network_metrics
.remove(metric_dict
)
269 #this removes the complete metric, all labels...
270 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
271 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
273 for collector
in self
.registry
._collectors
:
276 INFO:root:name:sonemu_rx_count_packets
277 labels:('vnf_name', 'vnf_interface')
278 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
283 logging
.info('{0}'.format(collector
._metrics
.values()))
285 if (vnf_name
, vnf_interface
, 'None') in collector
._metrics
:
286 logging
.info('2 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
288 collector
.remove(vnf_name
, vnf_interface
, 'None')
290 # set values to NaN, prometheus api currently does not support removal of metrics
291 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
293 # this removes the complete metric, all labels...
294 # 1 single monitor job for all metrics of the SDN controller
295 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
296 # we can not specify labels from the metrics to be removed
297 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
298 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
300 self
.monitor_lock
.release()
302 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
303 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
305 # delete everything from this vnf
306 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
307 self
.monitor_lock
.acquire()
308 self
.network_metrics
.remove(metric_dict
)
309 for collector
in self
.registry
._collectors
:
310 collector_dict
= collector
._metrics
.copy()
311 for name
, interface
, id in collector_dict
:
313 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
315 collector
.remove(name
, interface
, 'None')
317 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
318 self
.monitor_lock
.release()
319 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
320 return 'Stopped monitoring: {0}'.format(vnf_name
)
323 # get all metrics defined in the list and export it to Prometheus
324 def get_flow_metrics(self
):
325 while self
.start_monitoring
:
327 self
.monitor_flow_lock
.acquire()
329 for flow_dict
in self
.flow_metrics
:
332 data
['cookie'] = flow_dict
['cookie']
334 if 'tx' in flow_dict
['metric_key']:
335 data
['match'] = {'in_port':flow_dict
['mon_port']}
336 elif 'rx' in flow_dict
['metric_key']:
337 data
['out_port'] = flow_dict
['mon_port']
341 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
342 flow_stat_dict
= ast
.literal_eval(ret
)
344 logging
.debug('received flow stat:{0} '.format(flow_stat_dict
))
345 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
347 self
.monitor_flow_lock
.release()
350 def get_network_metrics(self
):
351 while self
.start_monitoring
:
353 self
.monitor_lock
.acquire()
355 # group metrics by dpid to optimize the rest api calls
356 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
357 dpid_set
= set(dpid_list
)
359 for dpid
in dpid_set
:
362 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
363 port_stat_dict
= ast
.literal_eval(ret
)
365 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
366 if int(metric_dict
['switch_dpid'])==int(dpid
)]
368 for metric_dict
in metric_list
:
369 self
.set_network_metric(metric_dict
, port_stat_dict
)
371 self
.monitor_lock
.release()
374 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
375 def set_network_metric(self
, metric_dict
, port_stat_dict
):
376 # vnf tx is the datacenter switch rx and vice-versa
377 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
378 switch_dpid
= metric_dict
['switch_dpid']
379 vnf_name
= metric_dict
['vnf_name']
380 vnf_interface
= metric_dict
['vnf_interface']
381 previous_measurement
= metric_dict
['previous_measurement']
382 previous_monitor_time
= metric_dict
['previous_monitor_time']
383 mon_port
= metric_dict
['mon_port']
385 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
386 if int(port_stat
['port_no']) == int(mon_port
):
387 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
388 this_measurement
= int(port_stat
[metric_key
])
390 # set prometheus metric
391 self
.prom_metrics
[metric_dict
['metric_key']].\
392 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': None}).\
393 set(this_measurement
)
395 # 1 single monitor job for all metrics of the SDN controller
396 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
398 # also the rate is calculated here, but not used for now
399 # (rate can be easily queried from prometheus also)
400 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
401 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
402 metric_dict
['previous_monitor_time'] = port_uptime
403 # do first measurement
405 self
.monitor_lock
.release()
407 metric_rate
= self
.get_network_metrics()
411 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
412 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
414 metric_dict
['previous_measurement'] = this_measurement
415 metric_dict
['previous_monitor_time'] = port_uptime
418 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
419 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
421 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
422 # vnf tx is the datacenter switch rx and vice-versa
423 metric_key
= metric_dict
['metric_key']
424 switch_dpid
= metric_dict
['switch_dpid']
425 vnf_name
= metric_dict
['vnf_name']
426 vnf_interface
= metric_dict
['vnf_interface']
427 previous_measurement
= metric_dict
['previous_measurement']
428 previous_monitor_time
= metric_dict
['previous_monitor_time']
429 cookie
= metric_dict
['cookie']
431 # TODO aggregate all found flow stats
432 flow_stat
= flow_stat_dict
[str(switch_dpid
)][0]
433 if 'bytes' in metric_key
:
434 counter
= flow_stat
['byte_count']
435 elif 'packet' in metric_key
:
436 counter
= flow_stat
['packet_count']
438 flow_uptime
= flow_stat
['duration_sec'] + flow_stat
['duration_nsec'] * 10 ** (-9)
440 self
.prom_metrics
[metric_dict
['metric_key']]. \
441 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': cookie
}). \
443 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
446 def start_Prometheus(self
, port
=9090):
447 # prometheus.yml configuration file is located in the same directory as this file
451 "-p", "{0}:9090".format(port
),
452 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
453 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
454 "--name", "prometheus",
457 logging
.info('Start Prometheus container {0}'.format(cmd
))
460 def start_PushGateway(self
, port
=9091):
464 "-p", "{0}:9091".format(port
),
465 "--name", "pushgateway",
469 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
472 def start_cadvisor(self
, port
=8090):
476 "--volume=/:/rootfs:ro",
477 "--volume=/var/run:/var/run:rw",
478 "--volume=/sys:/sys:ro",
479 "--volume=/var/lib/docker/:/var/lib/docker:ro",
480 "--publish={0}:8080".format(port
),
482 "google/cadvisor:latest"
484 logging
.info('Start cAdvisor container {0}'.format(cmd
))
488 # stop the monitoring thread
489 self
.start_monitoring
= False
490 self
.monitor_thread
.join()
491 self
.monitor_flow_thread
.join()
494 if self.prometheus_process is not None:
495 logging.info('stopping prometheus container')
496 self.prometheus_process.terminate()
497 self.prometheus_process.kill()
498 self._stop_container('prometheus')
500 if self.pushgateway_process is not None:
501 logging.info('stopping pushgateway container')
502 self.pushgateway_process.terminate()
503 self.pushgateway_process.kill()
504 self._stop_container('pushgateway')
507 if self
.cadvisor_process
is not None:
508 logging
.info('stopping cadvisor container')
509 self
.cadvisor_process
.terminate()
510 self
.cadvisor_process
.kill()
511 self
._stop
_container
('cadvisor')
513 def switch_tx_rx(self
,metric
=''):
514 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
515 # so we need to change the metric name to be consistent with the vnf rx or tx
517 metric
= metric
.replace('tx','rx')
519 metric
= metric
.replace('rx','tx')
523 def _stop_container(self
, name
):