2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
31 from mininet
.node
import OVSSwitch
34 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
35 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
37 from subprocess
import Popen
, PIPE
43 logging
.basicConfig(level
=logging
.INFO
)
46 class to read openflow stats from the Ryu controller of the DCNetwork
49 class DCNetworkMonitor():
50 def __init__(self
, net
):
53 prometheus_ip
= '127.0.0.1'
54 prometheus_port
= '9090'
55 self
.prometheus_REST_api
= 'http://{0}:{1}'.format(prometheus_ip
, prometheus_port
)
59 # helper variables to calculate the metrics
60 self
.pushgateway
= 'localhost:9091'
61 # Start up the server to expose the metrics to Prometheus.
62 #start_http_server(8000)
63 # supported Prometheus metrics
64 self
.registry
= CollectorRegistry()
65 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
66 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
67 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
68 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
69 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
70 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
71 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
72 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
74 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
75 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
77 # list of installed metrics to monitor
78 # each entry can contain this data
84 previous_measurement = 0
85 previous_monitor_time = 0
90 self
.monitor_lock
= threading
.Lock()
91 self
.monitor_flow_lock
= threading
.Lock()
92 self
.network_metrics
= []
93 self
.flow_metrics
= []
95 # start monitoring thread
96 self
.start_monitoring
= True
97 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
98 self
.monitor_thread
.start()
100 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
101 self
.monitor_flow_thread
.start()
104 #self.pushgateway_process = self.start_PushGateway()
105 #self.prometheus_process = self.start_Prometheus()
106 self
.cadvisor_process
= self
.start_cadvisor()
108 # first set some parameters, before measurement can start
109 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
113 # check if port is specified (vnf:port)
114 if vnf_interface
is None:
115 # take first interface by default
116 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
117 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
118 vnf_interface
= link_dict
[0]['src_port_id']
120 flow_metric
['vnf_name'] = vnf_name
121 flow_metric
['vnf_interface'] = vnf_interface
124 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
125 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
126 for link
in link_dict
:
127 # logging.info("{0},{1}".format(link_dict[link],vnf_interface))
128 if link_dict
[link
]['src_port_id'] == vnf_interface
:
129 # found the right link and connected switch
130 # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
131 vnf_switch
= connected_sw
132 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
136 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
137 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
140 # default port direction to monitor
142 metric
= 'tx_packets'
144 next_node
= self
.net
.getNodeByName(vnf_switch
)
146 if not isinstance(next_node
, OVSSwitch
):
147 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
150 flow_metric
['previous_measurement'] = 0
151 flow_metric
['previous_monitor_time'] = 0
153 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
154 flow_metric
['metric_key'] = metric
155 flow_metric
['cookie'] = cookie
157 self
.monitor_flow_lock
.acquire()
158 self
.flow_metrics
.append(flow_metric
)
159 self
.monitor_flow_lock
.release()
161 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
162 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
164 except Exception as ex
:
165 logging
.exception("setup_metric error.")
168 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0):
169 for flow_dict
in self
.flow_metrics
:
170 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
171 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
173 self
.monitor_flow_lock
.acquire()
175 self
.flow_metrics
.remove(flow_dict
)
177 for collector
in self
.registry
._collectors
:
178 if (vnf_name
, vnf_interface
, cookie
) in collector
._metrics
:
179 #logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
180 # collector._metrics))
181 collector
.remove(vnf_name
, vnf_interface
, cookie
)
183 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
185 self
.monitor_flow_lock
.release()
187 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
188 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
191 # first set some parameters, before measurement can start
192 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
196 # check if port is specified (vnf:port)
197 if vnf_interface
is None:
198 # take first interface by default
199 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
200 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
201 vnf_interface
= link_dict
[0]['src_port_id']
203 network_metric
['vnf_name'] = vnf_name
204 network_metric
['vnf_interface'] = vnf_interface
206 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
207 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
208 for link
in link_dict
:
209 # logging.info("{0},{1}".format(link_dict[link],vnf_interface))
210 if link_dict
[link
]['src_port_id'] == vnf_interface
:
211 # found the right link and connected switch
212 # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
213 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
216 if 'mon_port' not in network_metric
:
217 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
218 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
221 # default port direction to monitor
223 metric
= 'tx_packets'
225 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
227 if len(vnf_switch
) > 1:
228 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
230 elif len(vnf_switch
) == 0:
231 logging
.info("vnf: {0} is not connected".format(vnf_name
))
234 vnf_switch
= vnf_switch
[0]
235 next_node
= self
.net
.getNodeByName(vnf_switch
)
237 if not isinstance(next_node
, OVSSwitch
):
238 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
241 network_metric
['previous_measurement'] = 0
242 network_metric
['previous_monitor_time'] = 0
245 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
246 network_metric
['metric_key'] = metric
248 self
.monitor_lock
.acquire()
250 self
.network_metrics
.append(network_metric
)
251 self
.monitor_lock
.release()
254 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
255 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
257 except Exception as ex
:
258 logging
.exception("setup_metric error.")
261 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
263 for metric_dict
in self
.network_metrics
:
264 #logging.info('start Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric_dict))
265 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
266 and metric_dict
['metric_key'] == metric
:
268 self
.monitor_lock
.acquire()
270 self
.network_metrics
.remove(metric_dict
)
272 #this removes the complete metric, all labels...
273 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
274 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
276 for collector
in self
.registry
._collectors
:
277 #logging.info('name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, collector._metrics))
279 INFO:root:name:sonemu_rx_count_packets
280 labels:('vnf_name', 'vnf_interface')
281 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
286 logging
.info('{0}'.format(collector
._metrics
.values()))
287 #if self.prom_metrics[metric_dict['metric_key']]
288 if (vnf_name
, vnf_interface
, 'None') in collector
._metrics
:
289 logging
.info('2 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
291 #collector._metrics = {}
292 collector
.remove(vnf_name
, vnf_interface
, 'None')
294 # set values to NaN, prometheus api currently does not support removal of metrics
295 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
297 # this removes the complete metric, all labels...
298 # 1 single monitor job for all metrics of the SDN controller
299 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
300 # we can not specify labels from the metrics to be removed
301 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
302 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
304 self
.monitor_lock
.release()
306 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
307 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
309 # delete everything from this vnf
310 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
311 self
.monitor_lock
.acquire()
312 self
.network_metrics
.remove(metric_dict
)
313 for collector
in self
.registry
._collectors
:
314 collector_dict
= collector
._metrics
.copy()
315 for name
, interface
, id in collector_dict
:
317 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
319 collector
.remove(name
, interface
, 'None')
321 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
322 self
.monitor_lock
.release()
323 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
324 return 'Stopped monitoring: {0}'.format(vnf_name
)
327 # get all metrics defined in the list and export it to Prometheus
328 def get_flow_metrics(self
):
329 while self
.start_monitoring
:
331 self
.monitor_flow_lock
.acquire()
333 for flow_dict
in self
.flow_metrics
:
336 data
['cookie'] = flow_dict
['cookie']
338 if 'tx' in flow_dict
['metric_key']:
339 data
['match'] = {'in_port':flow_dict
['mon_port']}
340 elif 'rx' in flow_dict
['metric_key']:
341 data
['out_port'] = flow_dict
['mon_port']
345 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
346 flow_stat_dict
= ast
.literal_eval(ret
)
348 #logging.info('received flow stat:{0} '.format(flow_stat_dict))
349 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
351 self
.monitor_flow_lock
.release()
354 def get_network_metrics(self
):
355 while self
.start_monitoring
:
357 self
.monitor_lock
.acquire()
359 # group metrics by dpid to optimize the rest api calls
360 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
361 dpid_set
= set(dpid_list
)
363 for dpid
in dpid_set
:
366 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
367 port_stat_dict
= ast
.literal_eval(ret
)
369 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
370 if int(metric_dict
['switch_dpid'])==int(dpid
)]
371 #logging.info('1set prom packets:{0} '.format(self.network_metrics))
372 for metric_dict
in metric_list
:
373 self
.set_network_metric(metric_dict
, port_stat_dict
)
375 self
.monitor_lock
.release()
378 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
379 def set_network_metric(self
, metric_dict
, port_stat_dict
):
380 # vnf tx is the datacenter switch rx and vice-versa
381 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
382 switch_dpid
= metric_dict
['switch_dpid']
383 vnf_name
= metric_dict
['vnf_name']
384 vnf_interface
= metric_dict
['vnf_interface']
385 previous_measurement
= metric_dict
['previous_measurement']
386 previous_monitor_time
= metric_dict
['previous_monitor_time']
387 mon_port
= metric_dict
['mon_port']
389 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
390 if int(port_stat
['port_no']) == int(mon_port
):
391 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
392 this_measurement
= int(port_stat
[metric_key
])
393 #logging.info('set prom packets:{0} {1}:{2}'.format(this_measurement, vnf_name, vnf_interface))
395 # set prometheus metric
396 self
.prom_metrics
[metric_dict
['metric_key']].\
397 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': None}).\
398 set(this_measurement
)
399 #push_to_gateway(self.pushgateway, job='SDNcontroller',
400 # grouping_key={'metric':metric_dict['metric_key']}, registry=self.registry)
402 # 1 single monitor job for all metrics of the SDN controller
403 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
405 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
406 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
407 metric_dict
['previous_monitor_time'] = port_uptime
408 # do first measurement
409 #logging.info('first measurement')
411 self
.monitor_lock
.release()
413 metric_rate
= self
.get_network_metrics()
417 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
418 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
419 #logging.info('metric: {0} rate:{1}'.format(metric_dict['metric_key'], metric_rate))
421 metric_dict
['previous_measurement'] = this_measurement
422 metric_dict
['previous_monitor_time'] = port_uptime
425 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
426 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
428 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
429 # vnf tx is the datacenter switch rx and vice-versa
430 #metric_key = self.switch_tx_rx(metric_dict['metric_key'])
431 metric_key
= metric_dict
['metric_key']
432 switch_dpid
= metric_dict
['switch_dpid']
433 vnf_name
= metric_dict
['vnf_name']
434 vnf_interface
= metric_dict
['vnf_interface']
435 previous_measurement
= metric_dict
['previous_measurement']
436 previous_monitor_time
= metric_dict
['previous_monitor_time']
437 cookie
= metric_dict
['cookie']
439 # TODO aggregate all found flow stats
440 flow_stat
= flow_stat_dict
[str(switch_dpid
)][0]
441 if 'bytes' in metric_key
:
442 counter
= flow_stat
['byte_count']
443 elif 'packet' in metric_key
:
444 counter
= flow_stat
['packet_count']
446 flow_uptime
= flow_stat
['duration_sec'] + flow_stat
['duration_nsec'] * 10 ** (-9)
448 self
.prom_metrics
[metric_dict
['metric_key']]. \
449 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': cookie
}). \
451 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
453 #logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))
454 #return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)
456 def query_Prometheus(self
, query
):
459 for old in escaped_chars:
460 new = '\{0}'.format(old)
461 query = query.replace(old, new)
463 url
= self
.prometheus_REST_api
+ '/' + 'api/v1/query?query=' + query
464 #logging.info('query:{0}'.format(url))
465 req
= urllib2
.Request(url
)
466 ret
= urllib2
.urlopen(req
).read()
467 ret
= ast
.literal_eval(ret
)
468 if ret
['status'] == 'success':
469 #logging.info('return:{0}'.format(ret))
471 ret
= ret
['data']['result'][0]['value']
478 def start_Prometheus(self
, port
=9090):
479 # prometheus.yml configuration file is located in the same directory as this file
483 "-p", "{0}:9090".format(port
),
484 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
485 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
486 "--name", "prometheus",
489 logging
.info('Start Prometheus container {0}'.format(cmd
))
492 def start_PushGateway(self
, port
=9091):
496 "-p", "{0}:9091".format(port
),
497 "--name", "pushgateway",
501 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
504 def start_cadvisor(self
, port
=8090):
508 "--volume=/:/rootfs:ro",
509 "--volume=/var/run:/var/run:rw",
510 "--volume=/sys:/sys:ro",
511 "--volume=/var/lib/docker/:/var/lib/docker:ro",
512 "--publish={0}:8080".format(port
),
514 "google/cadvisor:latest"
516 logging
.info('Start cAdvisor container {0}'.format(cmd
))
520 # stop the monitoring thread
521 self
.start_monitoring
= False
522 self
.monitor_thread
.join()
523 self
.monitor_flow_thread
.join()
526 if self.prometheus_process is not None:
527 logging.info('stopping prometheus container')
528 self.prometheus_process.terminate()
529 self.prometheus_process.kill()
530 self._stop_container('prometheus')
532 if self.pushgateway_process is not None:
533 logging.info('stopping pushgateway container')
534 self.pushgateway_process.terminate()
535 self.pushgateway_process.kill()
536 self._stop_container('pushgateway')
539 if self
.cadvisor_process
is not None:
540 logging
.info('stopping cadvisor container')
541 self
.cadvisor_process
.terminate()
542 self
.cadvisor_process
.kill()
543 self
._stop
_container
('cadvisor')
545 def switch_tx_rx(self
,metric
=''):
546 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
547 # so we need to change the metric name to be consistent with the vnf rx or tx
549 metric
= metric
.replace('tx','rx')
551 metric
= metric
.replace('rx','tx')
555 def _stop_container(self
, name
):
566 def profile(self
, mgmt_ip
, rate
, input_ip
, vnf_uuid
):
568 ssh
= paramiko
.SSHClient()
569 ssh
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
570 #ssh.connect(mgmt_ip, username='steven', password='test')
571 ssh
.connect(mgmt_ip
, username
='root', password
='root')
573 iperf_cmd
= 'iperf -c {0} -u -l18 -b{1}M -t1000 &'.format(input_ip
, rate
)
575 stdin
, stdout
, stderr
= ssh
.exec_command(iperf_cmd
)
577 start_time
= time
.time()
578 query_cpu
= '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid
, 1)
579 while (time
.time() - start_time
) < 15:
580 data
= self
.query_Prometheus(query_cpu
)
581 # logging.info('rate: {1} data:{0}'.format(data, rate))
585 query_cpu2
= '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid
, 8)
586 cpu_load
= float(self
.query_Prometheus(query_cpu2
)[1])
587 output
= 'rate: {1}Mbps; cpu_load: {0}%'.format(round(cpu_load
* 100, 2), rate
)
589 logging
.info(output_line
)
591 stop_iperf
= 'pkill -9 iperf'
592 stdin
, stdout
, stderr
= ssh
.exec_command(stop_iperf
)