2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
30 from mininet
.node
import OVSSwitch
33 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
34 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
36 from subprocess
import Popen
40 logging
.basicConfig(level
=logging
.INFO
)
43 class to read openflow stats from the Ryu controller of the DCNetwork
46 class DCNetworkMonitor():
47 def __init__(self
, net
):
50 # TODO: these global variables should be part of a config file?
52 # prometheus is started outside of son-emu
53 prometheus_ip = '127.0.0.1'
54 prometheus_port = '9090'
55 self.prometheus_REST_api = 'http://{0}:{1}'.format(prometheus_ip, prometheus_port)
57 # helper variables to calculate the metrics
58 # pushgateway is started outside of son-emu and son-emu is started with net=host
59 # so localhost:9091 works
60 self
.pushgateway
= 'localhost:9091'
61 # when sdk is started with docker-compose, we could use
62 # self.pushgateway = 'pushgateway:9091'
63 # Start up the server to expose the metrics to Prometheus
64 #start_http_server(8000)
66 # supported Prometheus metrics
67 self
.registry
= CollectorRegistry()
68 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
69 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
70 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
71 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
72 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
73 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
74 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
75 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
77 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
78 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
80 # list of installed metrics to monitor
81 # each entry can contain this data
87 previous_measurement = 0
88 previous_monitor_time = 0
93 self
.monitor_lock
= threading
.Lock()
94 self
.monitor_flow_lock
= threading
.Lock()
95 self
.network_metrics
= []
96 self
.flow_metrics
= []
98 # start monitoring thread
99 self
.start_monitoring
= True
100 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
101 self
.monitor_thread
.start()
103 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
104 self
.monitor_flow_thread
.start()
107 # Prometheus pushgateway and DB are started as external contianer, outside of son-emu
108 #self.pushgateway_process = self.start_PushGateway()
109 #self.prometheus_process = self.start_Prometheus()
110 #self.cadvisor_process = self.start_cadvisor()
112 # first set some parameters, before measurement can start
113 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
117 # check if port is specified (vnf:port)
118 if vnf_interface
is None:
119 # take first interface by default
120 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
121 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
122 vnf_interface
= link_dict
[0]['src_port_id']
124 flow_metric
['vnf_name'] = vnf_name
125 flow_metric
['vnf_interface'] = vnf_interface
128 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
129 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
130 for link
in link_dict
:
131 if link_dict
[link
]['src_port_id'] == vnf_interface
:
132 # found the right link and connected switch
133 vnf_switch
= connected_sw
134 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
138 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
139 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
142 # default port direction to monitor
144 metric
= 'tx_packets'
146 next_node
= self
.net
.getNodeByName(vnf_switch
)
148 if not isinstance(next_node
, OVSSwitch
):
149 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
152 flow_metric
['previous_measurement'] = 0
153 flow_metric
['previous_monitor_time'] = 0
155 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
156 flow_metric
['metric_key'] = metric
157 flow_metric
['cookie'] = cookie
159 self
.monitor_flow_lock
.acquire()
160 self
.flow_metrics
.append(flow_metric
)
161 self
.monitor_flow_lock
.release()
163 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
164 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
166 except Exception as ex
:
167 logging
.exception("setup_metric error.")
170 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0,):
172 # check if port is specified (vnf:port)
173 if vnf_interface
is None and metric
is not None:
174 # take first interface by default
175 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
176 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
177 vnf_interface
= link_dict
[0]['src_port_id']
179 for flow_dict
in self
.flow_metrics
:
180 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
181 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
183 self
.monitor_flow_lock
.acquire()
185 self
.flow_metrics
.remove(flow_dict
)
187 for collector
in self
.registry
._collectors
:
188 if (vnf_name
, vnf_interface
, cookie
) in collector
._metrics
:
189 collector
.remove(vnf_name
, vnf_interface
, cookie
)
191 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
193 self
.monitor_flow_lock
.release()
195 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
196 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
198 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
201 # first set some parameters, before measurement can start
202 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
206 # check if port is specified (vnf:port)
207 if vnf_interface
is None:
208 # take first interface by default
209 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
210 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
211 vnf_interface
= link_dict
[0]['src_port_id']
213 network_metric
['vnf_name'] = vnf_name
214 network_metric
['vnf_interface'] = vnf_interface
216 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
217 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
218 for link
in link_dict
:
219 if link_dict
[link
]['src_port_id'] == vnf_interface
:
220 # found the right link and connected switch
221 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
224 if 'mon_port' not in network_metric
:
225 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
226 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
229 # default port direction to monitor
231 metric
= 'tx_packets'
233 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
235 if len(vnf_switch
) > 1:
236 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
238 elif len(vnf_switch
) == 0:
239 logging
.info("vnf: {0} is not connected".format(vnf_name
))
242 vnf_switch
= vnf_switch
[0]
243 next_node
= self
.net
.getNodeByName(vnf_switch
)
245 if not isinstance(next_node
, OVSSwitch
):
246 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
249 network_metric
['previous_measurement'] = 0
250 network_metric
['previous_monitor_time'] = 0
253 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
254 network_metric
['metric_key'] = metric
256 self
.monitor_lock
.acquire()
258 self
.network_metrics
.append(network_metric
)
259 self
.monitor_lock
.release()
262 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
263 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
265 except Exception as ex
:
266 logging
.exception("setup_metric error.")
269 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
271 # check if port is specified (vnf:port)
272 if vnf_interface
is None and metric
is not None:
273 # take first interface by default
274 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
275 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
276 vnf_interface
= link_dict
[0]['src_port_id']
278 for metric_dict
in self
.network_metrics
:
279 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
280 and metric_dict
['metric_key'] == metric
:
282 self
.monitor_lock
.acquire()
284 self
.network_metrics
.remove(metric_dict
)
286 #this removes the complete metric, all labels...
287 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
288 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
290 for collector
in self
.registry
._collectors
:
293 INFO:root:name:sonemu_rx_count_packets
294 labels:('vnf_name', 'vnf_interface')
295 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
300 logging
.info('{0}'.format(collector
._metrics
.values()))
302 if (vnf_name
, vnf_interface
, 'None') in collector
._metrics
:
303 logging
.info('2 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
305 collector
.remove(vnf_name
, vnf_interface
, 'None')
307 # set values to NaN, prometheus api currently does not support removal of metrics
308 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
310 # this removes the complete metric, all labels...
311 # 1 single monitor job for all metrics of the SDN controller
312 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
313 # we can not specify labels from the metrics to be removed
314 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
315 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
317 self
.monitor_lock
.release()
319 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
320 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
322 # delete everything from this vnf
323 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
324 self
.monitor_lock
.acquire()
325 self
.network_metrics
.remove(metric_dict
)
326 for collector
in self
.registry
._collectors
:
327 collector_dict
= collector
._metrics
.copy()
328 for name
, interface
, id in collector_dict
:
330 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
332 collector
.remove(name
, interface
, 'None')
334 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
335 self
.monitor_lock
.release()
336 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
337 return 'Stopped monitoring: {0}'.format(vnf_name
)
339 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
345 # get all metrics defined in the list and export it to Prometheus
346 def get_flow_metrics(self
):
347 while self
.start_monitoring
:
349 self
.monitor_flow_lock
.acquire()
351 for flow_dict
in self
.flow_metrics
:
354 data
['cookie'] = flow_dict
['cookie']
356 if 'tx' in flow_dict
['metric_key']:
357 data
['match'] = {'in_port':flow_dict
['mon_port']}
358 elif 'rx' in flow_dict
['metric_key']:
359 data
['out_port'] = flow_dict
['mon_port']
363 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
364 flow_stat_dict
= ast
.literal_eval(ret
)
366 logging
.debug('received flow stat:{0} '.format(flow_stat_dict
))
367 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
369 self
.monitor_flow_lock
.release()
372 def get_network_metrics(self
):
373 while self
.start_monitoring
:
375 self
.monitor_lock
.acquire()
377 # group metrics by dpid to optimize the rest api calls
378 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
379 dpid_set
= set(dpid_list
)
381 for dpid
in dpid_set
:
384 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
385 port_stat_dict
= ast
.literal_eval(ret
)
387 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
388 if int(metric_dict
['switch_dpid'])==int(dpid
)]
390 for metric_dict
in metric_list
:
391 self
.set_network_metric(metric_dict
, port_stat_dict
)
393 self
.monitor_lock
.release()
396 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
397 def set_network_metric(self
, metric_dict
, port_stat_dict
):
398 # vnf tx is the datacenter switch rx and vice-versa
399 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
400 switch_dpid
= metric_dict
['switch_dpid']
401 vnf_name
= metric_dict
['vnf_name']
402 vnf_interface
= metric_dict
['vnf_interface']
403 previous_measurement
= metric_dict
['previous_measurement']
404 previous_monitor_time
= metric_dict
['previous_monitor_time']
405 mon_port
= metric_dict
['mon_port']
407 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
408 if int(port_stat
['port_no']) == int(mon_port
):
409 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
410 this_measurement
= int(port_stat
[metric_key
])
412 # set prometheus metric
413 self
.prom_metrics
[metric_dict
['metric_key']].\
414 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': None}).\
415 set(this_measurement
)
417 # 1 single monitor job for all metrics of the SDN controller
418 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
420 # also the rate is calculated here, but not used for now
421 # (rate can be easily queried from prometheus also)
422 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
423 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
424 metric_dict
['previous_monitor_time'] = port_uptime
425 # do first measurement
427 self
.monitor_lock
.release()
429 metric_rate
= self
.get_network_metrics()
433 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
434 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
436 metric_dict
['previous_measurement'] = this_measurement
437 metric_dict
['previous_monitor_time'] = port_uptime
440 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
441 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
443 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
444 # vnf tx is the datacenter switch rx and vice-versa
445 metric_key
= metric_dict
['metric_key']
446 switch_dpid
= metric_dict
['switch_dpid']
447 vnf_name
= metric_dict
['vnf_name']
448 vnf_interface
= metric_dict
['vnf_interface']
449 previous_measurement
= metric_dict
['previous_measurement']
450 previous_monitor_time
= metric_dict
['previous_monitor_time']
451 cookie
= metric_dict
['cookie']
453 # TODO aggregate all found flow stats
454 #flow_stat = flow_stat_dict[str(switch_dpid)][0]
455 #if 'bytes' in metric_key:
456 # counter = flow_stat['byte_count']
457 #elif 'packet' in metric_key:
458 # counter = flow_stat['packet_count']
461 for flow_stat
in flow_stat_dict
[str(switch_dpid
)]:
462 if 'bytes' in metric_key
:
463 counter
+= flow_stat
['byte_count']
464 elif 'packet' in metric_key
:
465 counter
+= flow_stat
['packet_count']
467 flow_stat
= flow_stat_dict
[str(switch_dpid
)][0]
468 flow_uptime
= flow_stat
['duration_sec'] + flow_stat
['duration_nsec'] * 10 ** (-9)
470 self
.prom_metrics
[metric_dict
['metric_key']]. \
471 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': cookie
}). \
473 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
476 def start_Prometheus(self
, port
=9090):
477 # prometheus.yml configuration file is located in the same directory as this file
481 "-p", "{0}:9090".format(port
),
482 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
483 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
484 "--name", "prometheus",
487 logging
.info('Start Prometheus container {0}'.format(cmd
))
490 def start_PushGateway(self
, port
=9091):
494 "-p", "{0}:9091".format(port
),
495 "--name", "pushgateway",
499 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
502 def start_cadvisor(self
, port
=8090):
506 "--volume=/:/rootfs:ro",
507 "--volume=/var/run:/var/run:rw",
508 "--volume=/sys:/sys:ro",
509 "--volume=/var/lib/docker/:/var/lib/docker:ro",
510 "--publish={0}:8080".format(port
),
512 "google/cadvisor:latest"
514 logging
.info('Start cAdvisor container {0}'.format(cmd
))
518 # stop the monitoring thread
519 self
.start_monitoring
= False
520 self
.monitor_thread
.join()
521 self
.monitor_flow_thread
.join()
523 # these containers are used for monitoring but are started now outside of son-emu
525 if self.prometheus_process is not None:
526 logging.info('stopping prometheus container')
527 self.prometheus_process.terminate()
528 self.prometheus_process.kill()
529 self._stop_container('prometheus')
531 if self.pushgateway_process is not None:
532 logging.info('stopping pushgateway container')
533 self.pushgateway_process.terminate()
534 self.pushgateway_process.kill()
535 self._stop_container('pushgateway')
537 if self.cadvisor_process is not None:
538 logging.info('stopping cadvisor container')
539 self.cadvisor_process.terminate()
540 self.cadvisor_process.kill()
541 self._stop_container('cadvisor')
544 def switch_tx_rx(self
,metric
=''):
545 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
546 # so we need to change the metric name to be consistent with the vnf rx or tx
548 metric
= metric
.replace('tx','rx')
550 metric
= metric
.replace('rx','tx')
554 def _stop_container(self
, name
):