269a7e0564c6a899d57e1eb1e6e5d145a622ee71
2 Copyright (c) 2015 SONATA-NFV
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
17 Neither the name of the SONATA-NFV [, ANY ADDITIONAL AFFILIATION]
18 nor the names of its contributors may be used to endorse or promote
19 products derived from this software without specific prior written
22 This work has been performed in the framework of the SONATA project,
23 funded by the European Commission under Grant number 671517 through
24 the Horizon 2020 and 5G-PPP programmes. The authors would like to
25 acknowledge the contributions of their colleagues of the SONATA
26 partner consortium (www.sonata-nfv.eu).
31 from mininet
.node
import OVSSwitch
34 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
35 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
37 from subprocess
import Popen
42 logging
.basicConfig(level
=logging
.INFO
)
45 class to read openflow stats from the Ryu controller of the DCNetwork
48 PUSHGATEWAY_PORT
= 9091
49 # we cannot use port 8080 because ryu-ofrest api is already using that one
52 COOKIE_MASK
= 0xffffffff
54 class DCNetworkMonitor():
55 def __init__(self
, net
):
57 self
.dockercli
= docker
.from_env()
60 self
.pushgateway
= 'localhost:{0}'.format(PUSHGATEWAY_PORT
)
62 # supported Prometheus metrics
63 self
.registry
= CollectorRegistry()
64 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
65 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
66 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
67 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
68 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
69 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
70 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
71 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
73 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
74 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
76 # list of installed metrics to monitor
77 # each entry can contain this data
83 previous_measurement = 0
84 previous_monitor_time = 0
89 self
.monitor_lock
= threading
.Lock()
90 self
.monitor_flow_lock
= threading
.Lock()
91 self
.network_metrics
= []
92 self
.flow_metrics
= []
93 self
.skewmon_metrics
= {}
95 # start monitoring thread
96 self
.start_monitoring
= True
97 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
98 self
.monitor_thread
.start()
100 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
101 self
.monitor_flow_thread
.start()
104 # cAdvisor, Prometheus pushgateway are started as external container, to gather monitoring metric in son-emu
105 self
.pushgateway_process
= self
.start_PushGateway()
106 self
.cadvisor_process
= self
.start_cAdvisor()
109 # first set some parameters, before measurement can start
110 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
114 # check if port is specified (vnf:port)
115 if vnf_interface
is None:
116 # take first interface by default
117 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
118 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
119 vnf_interface
= link_dict
[0]['src_port_id']
121 flow_metric
['vnf_name'] = vnf_name
122 flow_metric
['vnf_interface'] = vnf_interface
125 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
126 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
127 for link
in link_dict
:
128 if link_dict
[link
]['src_port_id'] == vnf_interface
:
129 # found the right link and connected switch
130 vnf_switch
= connected_sw
131 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
135 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
136 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
139 # default port direction to monitor
141 metric
= 'tx_packets'
143 next_node
= self
.net
.getNodeByName(vnf_switch
)
145 if not isinstance(next_node
, OVSSwitch
):
146 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
149 flow_metric
['previous_measurement'] = 0
150 flow_metric
['previous_monitor_time'] = 0
152 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
153 flow_metric
['metric_key'] = metric
154 flow_metric
['cookie'] = cookie
156 self
.monitor_flow_lock
.acquire()
157 self
.flow_metrics
.append(flow_metric
)
158 self
.monitor_flow_lock
.release()
160 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
161 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
163 except Exception as ex
:
164 logging
.exception("setup_metric error.")
167 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0,):
169 # check if port is specified (vnf:port)
170 if vnf_interface
is None and metric
is not None:
171 # take first interface by default
172 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
173 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
174 vnf_interface
= link_dict
[0]['src_port_id']
176 for flow_dict
in self
.flow_metrics
:
177 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
178 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
180 self
.monitor_flow_lock
.acquire()
182 self
.flow_metrics
.remove(flow_dict
)
185 self
.prom_metrics
[flow_dict
['metric_key']]. \
186 labels(vnf_name
=vnf_name
, vnf_interface
=vnf_interface
, flow_id
=cookie
). \
189 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
191 self
.monitor_flow_lock
.release()
193 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
194 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
196 return 'Error stopping monitoring flow: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
199 # first set some parameters, before measurement can start
200 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
204 # check if port is specified (vnf:port)
205 if vnf_interface
is None:
206 # take first interface by default
207 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
208 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
209 vnf_interface
= link_dict
[0]['src_port_id']
211 network_metric
['vnf_name'] = vnf_name
212 network_metric
['vnf_interface'] = vnf_interface
214 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
215 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
216 for link
in link_dict
:
217 if link_dict
[link
]['src_port_id'] == vnf_interface
:
218 # found the right link and connected switch
219 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
222 if 'mon_port' not in network_metric
:
223 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
224 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
227 # default port direction to monitor
229 metric
= 'tx_packets'
231 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
233 if len(vnf_switch
) > 1:
234 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
236 elif len(vnf_switch
) == 0:
237 logging
.info("vnf: {0} is not connected".format(vnf_name
))
240 vnf_switch
= vnf_switch
[0]
241 next_node
= self
.net
.getNodeByName(vnf_switch
)
243 if not isinstance(next_node
, OVSSwitch
):
244 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
247 network_metric
['previous_measurement'] = 0
248 network_metric
['previous_monitor_time'] = 0
251 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
252 network_metric
['metric_key'] = metric
254 self
.monitor_lock
.acquire()
255 self
.network_metrics
.append(network_metric
)
256 self
.monitor_lock
.release()
259 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
260 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
262 except Exception as ex
:
263 logging
.exception("setup_metric error.")
266 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
268 # check if port is specified (vnf:port)
269 if vnf_interface
is None and metric
is not None:
270 # take first interface by default
271 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
272 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
273 vnf_interface
= link_dict
[0]['src_port_id']
275 for metric_dict
in self
.network_metrics
:
276 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
277 and metric_dict
['metric_key'] == metric
:
279 self
.monitor_lock
.acquire()
281 self
.network_metrics
.remove(metric_dict
)
283 # set values to NaN, prometheus api currently does not support removal of metrics
284 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
285 self
.prom_metrics
[metric_dict
['metric_key']]. \
286 labels(vnf_name
=vnf_name
, vnf_interface
=vnf_interface
, flow_id
=None). \
289 # this removes the complete metric, all labels...
290 # 1 single monitor job for all metrics of the SDN controller
291 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
292 # we can not specify labels from the metrics to be removed
293 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
294 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
296 self
.monitor_lock
.release()
298 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
299 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
301 # delete everything from this vnf
302 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
303 self
.monitor_lock
.acquire()
304 self
.network_metrics
.remove(metric_dict
)
305 for collector
in self
.registry
._collectors
:
306 collector_dict
= collector
._metrics
.copy()
307 for name
, interface
, id in collector_dict
:
309 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
311 collector
.remove(name
, interface
, 'None')
313 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
314 self
.monitor_lock
.release()
315 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
316 return 'Stopped monitoring: {0}'.format(vnf_name
)
318 return 'Error stopping monitoring metric: {0} on {1}:{2}'.format(metric
, vnf_name
, vnf_interface
)
321 # get all metrics defined in the list and export it to Prometheus
322 def get_flow_metrics(self
):
323 while self
.start_monitoring
:
325 self
.monitor_flow_lock
.acquire()
327 for flow_dict
in self
.flow_metrics
:
330 data
['cookie'] = flow_dict
['cookie']
331 data
['cookie_mask'] = COOKIE_MASK
333 if 'tx' in flow_dict
['metric_key']:
334 data
['match'] = {'in_port':flow_dict
['mon_port']}
335 elif 'rx' in flow_dict
['metric_key']:
336 data
['out_port'] = flow_dict
['mon_port']
340 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
341 if isinstance(ret
, dict):
343 elif isinstance(ret
, basestring
):
344 flow_stat_dict
= ast
.literal_eval(ret
.rstrip())
346 flow_stat_dict
= None
348 logging
.debug('received flow stat:{0} '.format(flow_stat_dict
))
350 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
354 if len(self
.flow_metrics
) > 0:
355 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
357 logging
.warning("Pushgateway not reachable: {0} {1}".format(Exception, e
))
359 self
.monitor_flow_lock
.release()
362 def get_network_metrics(self
):
363 while self
.start_monitoring
:
365 self
.monitor_lock
.acquire()
367 # group metrics by dpid to optimize the rest api calls
368 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
369 dpid_set
= set(dpid_list
)
371 for dpid
in dpid_set
:
374 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
375 if isinstance(ret
, dict):
377 elif isinstance(ret
, basestring
):
378 port_stat_dict
= ast
.literal_eval(ret
.rstrip())
380 port_stat_dict
= None
382 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
383 if int(metric_dict
['switch_dpid'])==int(dpid
)]
385 for metric_dict
in metric_list
:
386 self
.set_network_metric(metric_dict
, port_stat_dict
)
389 if len(self
.network_metrics
) > 0:
390 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
392 logging
.warning("Pushgateway not reachable: {0} {1}".format(Exception, e
))
394 self
.monitor_lock
.release()
397 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
398 def set_network_metric(self
, metric_dict
, port_stat_dict
):
399 # vnf tx is the datacenter switch rx and vice-versa
400 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
401 switch_dpid
= metric_dict
['switch_dpid']
402 vnf_name
= metric_dict
['vnf_name']
403 vnf_interface
= metric_dict
['vnf_interface']
404 previous_measurement
= metric_dict
['previous_measurement']
405 previous_monitor_time
= metric_dict
['previous_monitor_time']
406 mon_port
= metric_dict
['mon_port']
408 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
409 if int(port_stat
['port_no']) == int(mon_port
):
410 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
411 this_measurement
= int(port_stat
[metric_key
])
413 # set prometheus metric
414 self
.prom_metrics
[metric_dict
['metric_key']].\
415 labels(vnf_name
=vnf_name
, vnf_interface
=vnf_interface
, flow_id
=None).\
416 set(this_measurement
)
418 # also the rate is calculated here, but not used for now
419 # (rate can be easily queried from prometheus also)
420 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
421 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
422 metric_dict
['previous_monitor_time'] = port_uptime
423 # do first measurement
425 #self.monitor_lock.release()
426 # rate cannot be calculated yet (need a first measurement)
430 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
431 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
433 metric_dict
['previous_measurement'] = this_measurement
434 metric_dict
['previous_monitor_time'] = port_uptime
437 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
438 logging
.exception('monport:{0}, dpid:{1}'.format(mon_port
, switch_dpid
))
439 logging
.exception('port dict:{0}'.format(port_stat_dict
))
440 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
442 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
443 # vnf tx is the datacenter switch rx and vice-versa
444 metric_key
= metric_dict
['metric_key']
445 switch_dpid
= metric_dict
['switch_dpid']
446 vnf_name
= metric_dict
['vnf_name']
447 vnf_interface
= metric_dict
['vnf_interface']
448 previous_measurement
= metric_dict
['previous_measurement']
449 previous_monitor_time
= metric_dict
['previous_monitor_time']
450 cookie
= metric_dict
['cookie']
453 for flow_stat
in flow_stat_dict
[str(switch_dpid
)]:
454 if 'bytes' in metric_key
:
455 counter
+= flow_stat
['byte_count']
456 elif 'packet' in metric_key
:
457 counter
+= flow_stat
['packet_count']
459 # flow_uptime disabled for now (can give error)
460 #flow_stat = flow_stat_dict[str(switch_dpid)][0]
461 #flow_uptime = flow_stat['duration_sec'] + flow_stat['duration_nsec'] * 10 ** (-9)
463 self
.prom_metrics
[metric_dict
['metric_key']]. \
464 labels(vnf_name
=vnf_name
, vnf_interface
=vnf_interface
, flow_id
=cookie
). \
467 def start_Prometheus(self
, port
=9090):
468 # prometheus.yml configuration file is located in the same directory as this file
472 "-p", "{0}:9090".format(port
),
473 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
474 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
475 "--name", "prometheus",
478 logging
.info('Start Prometheus container {0}'.format(cmd
))
481 def start_PushGateway(self
, port
=PUSHGATEWAY_PORT
):
485 "-p", "{0}:9091".format(port
),
486 "--name", "pushgateway",
487 "--label", 'com.containernet=""',
491 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
494 def start_cAdvisor(self
, port
=CADVISOR_PORT
):
498 "--volume=/:/rootfs:ro",
499 "--volume=/var/run:/var/run:rw",
500 "--volume=/sys:/sys:ro",
501 "--volume=/var/lib/docker/:/var/lib/docker:ro",
502 "--publish={0}:8080".format(port
),
504 "--label",'com.containernet=""',
505 "google/cadvisor:latest"
507 logging
.info('Start cAdvisor container {0}'.format(cmd
))
511 # stop the monitoring thread
512 self
.start_monitoring
= False
513 self
.monitor_thread
.join()
514 self
.monitor_flow_thread
.join()
516 # these containers are used for monitoring but are started now outside of son-emu
518 if self
.pushgateway_process
is not None:
519 logging
.info('stopping pushgateway container')
520 self
._stop
_container
('pushgateway')
522 if self
.cadvisor_process
is not None:
523 logging
.info('stopping cadvisor container')
524 self
._stop
_container
('cadvisor')
526 def switch_tx_rx(self
,metric
=''):
527 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
528 # so we need to change the metric name to be consistent with the vnf rx or tx
530 metric
= metric
.replace('tx','rx')
532 metric
= metric
.replace('rx','tx')
536 def _stop_container(self
, name
):
538 container
= self
.dockercli
.containers
.get(name
)
539 container
.remove(force
=True)
541 def update_skewmon(self
, vnf_name
, resource_name
, action
):
545 config_file_path
= '/tmp/skewmon.cfg'
546 configfile
= open(config_file_path
, 'a+')
548 config
= json
.load(configfile
)
550 #not a valid json file or empty
553 #initialize config file
554 if len(self
.skewmon_metrics
) == 0:
556 json
.dump(config
, configfile
)
559 docker_name
= 'mn.' + vnf_name
560 vnf_container
= self
.dockercli
.containers
.get(docker_name
)
561 key
= resource_name
+ '_' + vnf_container
.short_id
562 vnf_id
= vnf_container
.id
564 if action
== 'start':
565 # add a new vnf to monitor
566 config
[key
] = dict(VNF_NAME
=vnf_name
,
568 VNF_METRIC
=resource_name
)
569 ret
= 'adding to skewness monitor: {0} {1} '.format(vnf_name
, resource_name
)
571 elif action
== 'stop':
572 # remove vnf to monitor
574 ret
= 'removing from skewness monitor: {0} {1} '.format(vnf_name
, resource_name
)
577 self
.skewmon_metrics
= config
578 configfile
= open(config_file_path
, 'w')
579 json
.dump(config
, configfile
)
583 skewmon_container
= self
.dockercli
.containers
.get('skewmon')
585 # remove container if config is empty
587 ret
+= 'stopping skewness monitor'
588 logging
.info('stopping skewness monitor')
589 skewmon_container
.remove(force
=True)
591 except docker
.errors
.NotFound
:
592 # start container if not running
593 ret
+= 'starting skewness monitor'
594 logging
.info('starting skewness monitor')
595 volumes
= {'/sys/fs/cgroup':{'bind':'/sys/fs/cgroup', 'mode':'ro'},
596 '/tmp/skewmon.cfg':{'bind':'/config.txt', 'mode':'ro'}}
597 self
.dockercli
.containers
.run('skewmon',
600 labels
=['com.containernet'],
603 # Wait a while for containers to be completely started
607 list1
= self
.dockercli
.containers
.list(filters
={'status': 'running', 'name': 'prometheus'})
612 return 'skewmon not started'