074506b476bf5a65243c39cc80a8b7112e6c434f
2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
30 from mininet
.node
import OVSSwitch
33 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
34 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
36 from subprocess
import Popen
40 logging
.basicConfig(level
=logging
.INFO
)
43 class to read openflow stats from the Ryu controller of the DCNetwork
46 class DCNetworkMonitor():
47 def __init__(self
, net
):
50 # TODO: these global variables should be part of a config file?
52 # prometheus is started outside of son-emu
53 prometheus_ip = '127.0.0.1'
54 prometheus_port = '9090'
55 self.prometheus_REST_api = 'http://{0}:{1}'.format(prometheus_ip, prometheus_port)
57 # helper variables to calculate the metrics
58 # pushgateway is started outside of son-emu and son-emu is started with net=host
59 # so localhost:9091 works
60 self
.pushgateway
= 'localhost:9091'
61 # when sdk is started with docker-compose, we could use
62 # self.pushgateway = 'pushgateway:9091'
64 # supported Prometheus metrics
65 self
.registry
= CollectorRegistry()
66 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
67 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
68 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
69 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
70 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
71 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
72 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
73 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
75 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
76 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
78 # list of installed metrics to monitor
79 # each entry can contain this data
85 previous_measurement = 0
86 previous_monitor_time = 0
91 self
.monitor_lock
= threading
.Lock()
92 self
.monitor_flow_lock
= threading
.Lock()
93 self
.network_metrics
= []
94 self
.flow_metrics
= []
96 # start monitoring thread
97 self
.start_monitoring
= True
98 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
99 self
.monitor_thread
.start()
101 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
102 self
.monitor_flow_thread
.start()
105 # cAdvisor, Prometheus pushgateway and DB are started as external container, outside of son-emu
108 # first set some parameters, before measurement can start
109 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
113 # check if port is specified (vnf:port)
114 if vnf_interface
is None:
115 # take first interface by default
116 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
117 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
118 vnf_interface
= link_dict
[0]['src_port_id']
120 flow_metric
['vnf_name'] = vnf_name
121 flow_metric
['vnf_interface'] = vnf_interface
124 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
125 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
126 for link
in link_dict
:
127 if link_dict
[link
]['src_port_id'] == vnf_interface
:
128 # found the right link and connected switch
129 vnf_switch
= connected_sw
130 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
134 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
135 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
138 # default port direction to monitor
140 metric
= 'tx_packets'
142 next_node
= self
.net
.getNodeByName(vnf_switch
)
144 if not isinstance(next_node
, OVSSwitch
):
145 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
148 flow_metric
['previous_measurement'] = 0
149 flow_metric
['previous_monitor_time'] = 0
151 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
152 flow_metric
['metric_key'] = metric
153 flow_metric
['cookie'] = cookie
155 self
.monitor_flow_lock
.acquire()
156 self
.flow_metrics
.append(flow_metric
)
157 self
.monitor_flow_lock
.release()
159 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
160 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
162 except Exception as ex
:
163 logging
.exception("setup_metric error.")
166 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0,):
168 # check if port is specified (vnf:port)
169 if vnf_interface
is None and metric
is not None:
170 # take first interface by default
171 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
172 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
173 vnf_interface
= link_dict
[0]['src_port_id']
175 for flow_dict
in self
.flow_metrics
:
176 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
177 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
179 self
.monitor_flow_lock
.acquire()
181 self
.flow_metrics
.remove(flow_dict
)
183 for collector
in self
.registry
._collectors
:
184 if (vnf_name
, vnf_interface
, cookie
) in collector
._metrics
:
185 collector
.remove(vnf_name
, vnf_interface
, cookie
)
187 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
189 self
.monitor_flow_lock
.release()
191 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
192 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
194 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
197 # first set some parameters, before measurement can start
198 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
202 # check if port is specified (vnf:port)
203 if vnf_interface
is None:
204 # take first interface by default
205 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
206 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
207 vnf_interface
= link_dict
[0]['src_port_id']
209 network_metric
['vnf_name'] = vnf_name
210 network_metric
['vnf_interface'] = vnf_interface
212 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
213 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
214 for link
in link_dict
:
215 if link_dict
[link
]['src_port_id'] == vnf_interface
:
216 # found the right link and connected switch
217 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
220 if 'mon_port' not in network_metric
:
221 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
222 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
225 # default port direction to monitor
227 metric
= 'tx_packets'
229 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
231 if len(vnf_switch
) > 1:
232 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
234 elif len(vnf_switch
) == 0:
235 logging
.info("vnf: {0} is not connected".format(vnf_name
))
238 vnf_switch
= vnf_switch
[0]
239 next_node
= self
.net
.getNodeByName(vnf_switch
)
241 if not isinstance(next_node
, OVSSwitch
):
242 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
245 network_metric
['previous_measurement'] = 0
246 network_metric
['previous_monitor_time'] = 0
249 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
250 network_metric
['metric_key'] = metric
252 self
.monitor_lock
.acquire()
254 self
.network_metrics
.append(network_metric
)
255 self
.monitor_lock
.release()
258 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
259 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
261 except Exception as ex
:
262 logging
.exception("setup_metric error.")
265 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
267 # check if port is specified (vnf:port)
268 if vnf_interface
is None and metric
is not None:
269 # take first interface by default
270 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
271 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
272 vnf_interface
= link_dict
[0]['src_port_id']
274 for metric_dict
in self
.network_metrics
:
275 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
276 and metric_dict
['metric_key'] == metric
:
278 self
.monitor_lock
.acquire()
280 self
.network_metrics
.remove(metric_dict
)
282 #this removes the complete metric, all labels...
283 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
284 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
286 for collector
in self
.registry
._collectors
:
289 INFO:root:name:sonemu_rx_count_packets
290 labels:('vnf_name', 'vnf_interface')
291 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
296 logging
.info('{0}'.format(collector
._metrics
.values()))
298 if (vnf_name
, vnf_interface
, 'None') in collector
._metrics
:
299 logging
.info('2 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
301 collector
.remove(vnf_name
, vnf_interface
, 'None')
303 # set values to NaN, prometheus api currently does not support removal of metrics
304 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
306 # this removes the complete metric, all labels...
307 # 1 single monitor job for all metrics of the SDN controller
308 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
309 # we can not specify labels from the metrics to be removed
310 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
311 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
313 self
.monitor_lock
.release()
315 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
316 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
318 # delete everything from this vnf
319 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
320 self
.monitor_lock
.acquire()
321 self
.network_metrics
.remove(metric_dict
)
322 for collector
in self
.registry
._collectors
:
323 collector_dict
= collector
._metrics
.copy()
324 for name
, interface
, id in collector_dict
:
326 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
328 collector
.remove(name
, interface
, 'None')
330 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
331 self
.monitor_lock
.release()
332 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
333 return 'Stopped monitoring: {0}'.format(vnf_name
)
335 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
341 # get all metrics defined in the list and export it to Prometheus
342 def get_flow_metrics(self
):
343 while self
.start_monitoring
:
345 self
.monitor_flow_lock
.acquire()
347 for flow_dict
in self
.flow_metrics
:
350 data
['cookie'] = flow_dict
['cookie']
351 data
['cookie_mask'] = flow_dict
['cookie']
353 if 'tx' in flow_dict
['metric_key']:
354 data
['match'] = {'in_port':flow_dict
['mon_port']}
355 elif 'rx' in flow_dict
['metric_key']:
356 data
['out_port'] = flow_dict
['mon_port']
360 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
361 flow_stat_dict
= ast
.literal_eval(ret
)
363 logging
.debug('received flow stat:{0} '.format(flow_stat_dict
))
364 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
366 self
.monitor_flow_lock
.release()
369 def get_network_metrics(self
):
370 while self
.start_monitoring
:
372 self
.monitor_lock
.acquire()
374 # group metrics by dpid to optimize the rest api calls
375 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
376 dpid_set
= set(dpid_list
)
378 for dpid
in dpid_set
:
381 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
382 port_stat_dict
= ast
.literal_eval(ret
)
384 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
385 if int(metric_dict
['switch_dpid'])==int(dpid
)]
387 for metric_dict
in metric_list
:
388 self
.set_network_metric(metric_dict
, port_stat_dict
)
390 self
.monitor_lock
.release()
393 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
394 def set_network_metric(self
, metric_dict
, port_stat_dict
):
395 # vnf tx is the datacenter switch rx and vice-versa
396 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
397 switch_dpid
= metric_dict
['switch_dpid']
398 vnf_name
= metric_dict
['vnf_name']
399 vnf_interface
= metric_dict
['vnf_interface']
400 previous_measurement
= metric_dict
['previous_measurement']
401 previous_monitor_time
= metric_dict
['previous_monitor_time']
402 mon_port
= metric_dict
['mon_port']
404 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
405 if int(port_stat
['port_no']) == int(mon_port
):
406 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
407 this_measurement
= int(port_stat
[metric_key
])
409 # set prometheus metric
410 self
.prom_metrics
[metric_dict
['metric_key']].\
411 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': None}).\
412 set(this_measurement
)
414 # 1 single monitor job for all metrics of the SDN controller
415 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
417 # also the rate is calculated here, but not used for now
418 # (rate can be easily queried from prometheus also)
419 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
420 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
421 metric_dict
['previous_monitor_time'] = port_uptime
422 # do first measurement
424 self
.monitor_lock
.release()
426 metric_rate
= self
.get_network_metrics()
430 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
431 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
433 metric_dict
['previous_measurement'] = this_measurement
434 metric_dict
['previous_monitor_time'] = port_uptime
437 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
438 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
440 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
441 # vnf tx is the datacenter switch rx and vice-versa
442 metric_key
= metric_dict
['metric_key']
443 switch_dpid
= metric_dict
['switch_dpid']
444 vnf_name
= metric_dict
['vnf_name']
445 vnf_interface
= metric_dict
['vnf_interface']
446 previous_measurement
= metric_dict
['previous_measurement']
447 previous_monitor_time
= metric_dict
['previous_monitor_time']
448 cookie
= metric_dict
['cookie']
450 # TODO aggregate all found flow stats
451 #flow_stat = flow_stat_dict[str(switch_dpid)][0]
452 #if 'bytes' in metric_key:
453 # counter = flow_stat['byte_count']
454 #elif 'packet' in metric_key:
455 # counter = flow_stat['packet_count']
458 for flow_stat
in flow_stat_dict
[str(switch_dpid
)]:
459 if 'bytes' in metric_key
:
460 counter
+= flow_stat
['byte_count']
461 elif 'packet' in metric_key
:
462 counter
+= flow_stat
['packet_count']
464 flow_stat
= flow_stat_dict
[str(switch_dpid
)][0]
465 flow_uptime
= flow_stat
['duration_sec'] + flow_stat
['duration_nsec'] * 10 ** (-9)
467 self
.prom_metrics
[metric_dict
['metric_key']]. \
468 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': cookie
}). \
470 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
473 def start_Prometheus(self
, port
=9090):
474 # prometheus.yml configuration file is located in the same directory as this file
478 "-p", "{0}:9090".format(port
),
479 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
480 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
481 "--name", "prometheus",
484 logging
.info('Start Prometheus container {0}'.format(cmd
))
487 def start_PushGateway(self
, port
=9091):
491 "-p", "{0}:9091".format(port
),
492 "--name", "pushgateway",
496 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
499 def start_cadvisor(self
, port
=8090):
503 "--volume=/:/rootfs:ro",
504 "--volume=/var/run:/var/run:rw",
505 "--volume=/sys:/sys:ro",
506 "--volume=/var/lib/docker/:/var/lib/docker:ro",
507 "--publish={0}:8080".format(port
),
509 "google/cadvisor:latest"
511 logging
.info('Start cAdvisor container {0}'.format(cmd
))
515 # stop the monitoring thread
516 self
.start_monitoring
= False
517 self
.monitor_thread
.join()
518 self
.monitor_flow_thread
.join()
520 # these containers are used for monitoring but are started now outside of son-emu
522 if self.prometheus_process is not None:
523 logging.info('stopping prometheus container')
524 self.prometheus_process.terminate()
525 self.prometheus_process.kill()
526 self._stop_container('prometheus')
528 if self.pushgateway_process is not None:
529 logging.info('stopping pushgateway container')
530 self.pushgateway_process.terminate()
531 self.pushgateway_process.kill()
532 self._stop_container('pushgateway')
534 if self.cadvisor_process is not None:
535 logging.info('stopping cadvisor container')
536 self.cadvisor_process.terminate()
537 self.cadvisor_process.kill()
538 self._stop_container('cadvisor')
541 def switch_tx_rx(self
,metric
=''):
542 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
543 # so we need to change the metric name to be consistent with the vnf rx or tx
545 metric
= metric
.replace('tx','rx')
547 metric
= metric
.replace('rx','tx')
551 def _stop_container(self
, name
):