2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
31 from mininet
.node
import OVSSwitch
34 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
35 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
37 from subprocess
import Popen
41 logging
.basicConfig(level
=logging
.INFO
)
44 class to read openflow stats from the Ryu controller of the DCNetwork
47 PUSHGATEWAY_PORT
= 9091
48 # we cannot use port 8080 because ryu-ofrest api is already using that one
51 COOKIE_MASK
= 0xffffffff
53 class DCNetworkMonitor():
54 def __init__(self
, net
):
56 self
.dockercli
= docker
.from_env()
59 self
.pushgateway
= 'localhost:{0}'.format(PUSHGATEWAY_PORT
)
61 # supported Prometheus metrics
62 self
.registry
= CollectorRegistry()
63 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
64 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
65 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
66 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
67 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
68 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
69 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
70 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
72 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
73 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
75 # list of installed metrics to monitor
76 # each entry can contain this data
82 previous_measurement = 0
83 previous_monitor_time = 0
88 self
.monitor_lock
= threading
.Lock()
89 self
.monitor_flow_lock
= threading
.Lock()
90 self
.network_metrics
= []
91 self
.flow_metrics
= []
93 # start monitoring thread
94 self
.start_monitoring
= True
95 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
96 self
.monitor_thread
.start()
98 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
99 self
.monitor_flow_thread
.start()
102 # cAdvisor, Prometheus pushgateway are started as external container, to gather monitoring metric in son-emu
103 self
.pushgateway_process
= self
.start_PushGateway()
104 self
.cadvisor_process
= self
.start_cAdvisor()
107 # first set some parameters, before measurement can start
108 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
112 # check if port is specified (vnf:port)
113 if vnf_interface
is None:
114 # take first interface by default
115 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
116 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
117 vnf_interface
= link_dict
[0]['src_port_id']
119 flow_metric
['vnf_name'] = vnf_name
120 flow_metric
['vnf_interface'] = vnf_interface
123 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
124 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
125 for link
in link_dict
:
126 if link_dict
[link
]['src_port_id'] == vnf_interface
:
127 # found the right link and connected switch
128 vnf_switch
= connected_sw
129 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
133 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
134 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
137 # default port direction to monitor
139 metric
= 'tx_packets'
141 next_node
= self
.net
.getNodeByName(vnf_switch
)
143 if not isinstance(next_node
, OVSSwitch
):
144 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
147 flow_metric
['previous_measurement'] = 0
148 flow_metric
['previous_monitor_time'] = 0
150 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
151 flow_metric
['metric_key'] = metric
152 flow_metric
['cookie'] = cookie
154 self
.monitor_flow_lock
.acquire()
155 self
.flow_metrics
.append(flow_metric
)
156 self
.monitor_flow_lock
.release()
158 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
159 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
161 except Exception as ex
:
162 logging
.exception("setup_metric error.")
165 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0,):
167 # check if port is specified (vnf:port)
168 if vnf_interface
is None and metric
is not None:
169 # take first interface by default
170 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
171 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
172 vnf_interface
= link_dict
[0]['src_port_id']
174 for flow_dict
in self
.flow_metrics
:
175 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
176 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
178 self
.monitor_flow_lock
.acquire()
180 self
.flow_metrics
.remove(flow_dict
)
183 self
.prom_metrics
[flow_dict
['metric_key']]. \
184 labels(vnf_name
=vnf_name
, vnf_interface
=vnf_interface
, flow_id
=cookie
). \
187 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
189 self
.monitor_flow_lock
.release()
191 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
192 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
194 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
197 # first set some parameters, before measurement can start
198 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
202 # check if port is specified (vnf:port)
203 if vnf_interface
is None:
204 # take first interface by default
205 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
206 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
207 vnf_interface
= link_dict
[0]['src_port_id']
209 network_metric
['vnf_name'] = vnf_name
210 network_metric
['vnf_interface'] = vnf_interface
212 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
213 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
214 for link
in link_dict
:
215 if link_dict
[link
]['src_port_id'] == vnf_interface
:
216 # found the right link and connected switch
217 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
220 if 'mon_port' not in network_metric
:
221 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
222 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
225 # default port direction to monitor
227 metric
= 'tx_packets'
229 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
231 if len(vnf_switch
) > 1:
232 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
234 elif len(vnf_switch
) == 0:
235 logging
.info("vnf: {0} is not connected".format(vnf_name
))
238 vnf_switch
= vnf_switch
[0]
239 next_node
= self
.net
.getNodeByName(vnf_switch
)
241 if not isinstance(next_node
, OVSSwitch
):
242 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
245 network_metric
['previous_measurement'] = 0
246 network_metric
['previous_monitor_time'] = 0
249 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
250 network_metric
['metric_key'] = metric
252 self
.monitor_lock
.acquire()
253 self
.network_metrics
.append(network_metric
)
254 self
.monitor_lock
.release()
257 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
258 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
260 except Exception as ex
:
261 logging
.exception("setup_metric error.")
264 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
266 # check if port is specified (vnf:port)
267 if vnf_interface
is None and metric
is not None:
268 # take first interface by default
269 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
270 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
271 vnf_interface
= link_dict
[0]['src_port_id']
273 for metric_dict
in self
.network_metrics
:
274 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
275 and metric_dict
['metric_key'] == metric
:
277 self
.monitor_lock
.acquire()
279 self
.network_metrics
.remove(metric_dict
)
281 # set values to NaN, prometheus api currently does not support removal of metrics
282 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
283 self
.prom_metrics
[metric_dict
['metric_key']]. \
284 labels(vnf_name
=vnf_name
, vnf_interface
=vnf_interface
, flow_id
=None). \
287 # this removes the complete metric, all labels...
288 # 1 single monitor job for all metrics of the SDN controller
289 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
290 # we can not specify labels from the metrics to be removed
291 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
292 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
294 self
.monitor_lock
.release()
296 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
297 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
299 # delete everything from this vnf
300 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
301 self
.monitor_lock
.acquire()
302 self
.network_metrics
.remove(metric_dict
)
303 for collector
in self
.registry
._collectors
:
304 collector_dict
= collector
._metrics
.copy()
305 for name
, interface
, id in collector_dict
:
307 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
309 collector
.remove(name
, interface
, 'None')
311 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
312 self
.monitor_lock
.release()
313 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
314 return 'Stopped monitoring: {0}'.format(vnf_name
)
316 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
319 # get all metrics defined in the list and export it to Prometheus
320 def get_flow_metrics(self
):
321 while self
.start_monitoring
:
323 self
.monitor_flow_lock
.acquire()
325 for flow_dict
in self
.flow_metrics
:
328 data
['cookie'] = flow_dict
['cookie']
329 data
['cookie_mask'] = COOKIE_MASK
331 if 'tx' in flow_dict
['metric_key']:
332 data
['match'] = {'in_port':flow_dict
['mon_port']}
333 elif 'rx' in flow_dict
['metric_key']:
334 data
['out_port'] = flow_dict
['mon_port']
338 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
339 if isinstance(ret
, dict):
341 elif isinstance(ret
, basestring
):
342 flow_stat_dict
= ast
.literal_eval(ret
.rstrip())
344 flow_stat_dict
= None
346 logging
.debug('received flow stat:{0} '.format(flow_stat_dict
))
348 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
352 if len(self
.flow_metrics
) > 0:
353 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
355 logging
.warning("Pushgateway not reachable: {0} {1}".format(Exception, e
))
357 self
.monitor_flow_lock
.release()
360 def get_network_metrics(self
):
361 while self
.start_monitoring
:
363 self
.monitor_lock
.acquire()
365 # group metrics by dpid to optimize the rest api calls
366 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
367 dpid_set
= set(dpid_list
)
369 for dpid
in dpid_set
:
372 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
373 if isinstance(ret
, dict):
375 elif isinstance(ret
, basestring
):
376 port_stat_dict
= ast
.literal_eval(ret
.rstrip())
378 port_stat_dict
= None
380 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
381 if int(metric_dict
['switch_dpid'])==int(dpid
)]
383 for metric_dict
in metric_list
:
384 self
.set_network_metric(metric_dict
, port_stat_dict
)
387 if len(self
.network_metrics
) > 0:
388 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
390 logging
.warning("Pushgateway not reachable: {0} {1}".format(Exception, e
))
392 self
.monitor_lock
.release()
395 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
396 def set_network_metric(self
, metric_dict
, port_stat_dict
):
397 # vnf tx is the datacenter switch rx and vice-versa
398 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
399 switch_dpid
= metric_dict
['switch_dpid']
400 vnf_name
= metric_dict
['vnf_name']
401 vnf_interface
= metric_dict
['vnf_interface']
402 previous_measurement
= metric_dict
['previous_measurement']
403 previous_monitor_time
= metric_dict
['previous_monitor_time']
404 mon_port
= metric_dict
['mon_port']
406 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
407 if int(port_stat
['port_no']) == int(mon_port
):
408 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
409 this_measurement
= int(port_stat
[metric_key
])
411 # set prometheus metric
412 self
.prom_metrics
[metric_dict
['metric_key']].\
413 labels(vnf_name
=vnf_name
, vnf_interface
=vnf_interface
, flow_id
=None).\
414 set(this_measurement
)
416 # also the rate is calculated here, but not used for now
417 # (rate can be easily queried from prometheus also)
418 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
419 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
420 metric_dict
['previous_monitor_time'] = port_uptime
421 # do first measurement
423 #self.monitor_lock.release()
424 # rate cannot be calculated yet (need a first measurement)
428 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
429 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
431 metric_dict
['previous_measurement'] = this_measurement
432 metric_dict
['previous_monitor_time'] = port_uptime
435 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
436 logging
.exception('monport:{0}, dpid:{1}'.format(mon_port
, switch_dpid
))
437 logging
.exception('port dict:{0}'.format(port_stat_dict
))
438 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
440 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
441 # vnf tx is the datacenter switch rx and vice-versa
442 metric_key
= metric_dict
['metric_key']
443 switch_dpid
= metric_dict
['switch_dpid']
444 vnf_name
= metric_dict
['vnf_name']
445 vnf_interface
= metric_dict
['vnf_interface']
446 previous_measurement
= metric_dict
['previous_measurement']
447 previous_monitor_time
= metric_dict
['previous_monitor_time']
448 cookie
= metric_dict
['cookie']
451 for flow_stat
in flow_stat_dict
[str(switch_dpid
)]:
452 if 'bytes' in metric_key
:
453 counter
+= flow_stat
['byte_count']
454 elif 'packet' in metric_key
:
455 counter
+= flow_stat
['packet_count']
457 # flow_uptime disabled for now (can give error)
458 #flow_stat = flow_stat_dict[str(switch_dpid)][0]
459 #flow_uptime = flow_stat['duration_sec'] + flow_stat['duration_nsec'] * 10 ** (-9)
461 self
.prom_metrics
[metric_dict
['metric_key']]. \
462 labels(vnf_name
=vnf_name
, vnf_interface
=vnf_interface
, flow_id
=cookie
). \
465 def start_Prometheus(self
, port
=9090):
466 # prometheus.yml configuration file is located in the same directory as this file
470 "-p", "{0}:9090".format(port
),
471 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
472 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
473 "--name", "prometheus",
476 logging
.info('Start Prometheus container {0}'.format(cmd
))
479 def start_PushGateway(self
, port
=PUSHGATEWAY_PORT
):
483 "-p", "{0}:9091".format(port
),
484 "--name", "pushgateway",
485 "--label", 'com.containernet=""',
489 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
492 def start_cAdvisor(self
, port
=CADVISOR_PORT
):
496 "--volume=/:/rootfs:ro",
497 "--volume=/var/run:/var/run:rw",
498 "--volume=/sys:/sys:ro",
499 "--volume=/var/lib/docker/:/var/lib/docker:ro",
500 "--publish={0}:8080".format(port
),
502 "--label",'com.containernet=""',
503 "google/cadvisor:latest"
505 logging
.info('Start cAdvisor container {0}'.format(cmd
))
509 # stop the monitoring thread
510 self
.start_monitoring
= False
511 self
.monitor_thread
.join()
512 self
.monitor_flow_thread
.join()
514 # these containers are used for monitoring but are started now outside of son-emu
516 if self
.pushgateway_process
is not None:
517 logging
.info('stopping pushgateway container')
518 self
._stop
_container
('pushgateway')
520 if self
.cadvisor_process
is not None:
521 logging
.info('stopping cadvisor container')
522 self
._stop
_container
('cadvisor')
524 def switch_tx_rx(self
,metric
=''):
525 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
526 # so we need to change the metric name to be consistent with the vnf rx or tx
528 metric
= metric
.replace('tx','rx')
530 metric
= metric
.replace('rx','tx')
534 def _stop_container(self
, name
):
536 container
= self
.dockercli
.containers
.get(name
)
537 container
.remove(force
=True)