1 __author__
= 'Administrator'
5 from mininet
.node
import OVSSwitch
8 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
9 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
11 from subprocess
import Popen
, PIPE
17 logging
.basicConfig(level
=logging
.INFO
)
20 class to read openflow stats from the Ryu controller of the DCNetwork
23 class DCNetworkMonitor():
24 def __init__(self
, net
):
27 prometheus_ip
= '127.0.0.1'
28 prometheus_port
= '9090'
29 self
.prometheus_REST_api
= 'http://{0}:{1}'.format(prometheus_ip
, prometheus_port
)
33 # helper variables to calculate the metrics
34 self
.pushgateway
= 'localhost:9091'
35 # Start up the server to expose the metrics to Prometheus.
36 #start_http_server(8000)
37 # supported Prometheus metrics
38 self
.registry
= CollectorRegistry()
39 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
40 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
41 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
42 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
43 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
44 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
45 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
46 ['vnf_name', 'vnf_interface', 'flow_id'], registry
=self
.registry
)
48 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
49 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
51 # list of installed metrics to monitor
52 # each entry can contain this data
58 previous_measurement = 0
59 previous_monitor_time = 0
64 self
.monitor_lock
= threading
.Lock()
65 self
.monitor_flow_lock
= threading
.Lock()
66 self
.network_metrics
= []
67 self
.flow_metrics
= []
69 # start monitoring thread
70 self
.start_monitoring
= True
71 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
72 self
.monitor_thread
.start()
74 self
.monitor_flow_thread
= threading
.Thread(target
=self
.get_flow_metrics
)
75 self
.monitor_flow_thread
.start()
78 #self.pushgateway_process = self.start_PushGateway()
79 #self.prometheus_process = self.start_Prometheus()
80 self
.cadvisor_process
= self
.start_cadvisor()
82 # first set some parameters, before measurement can start
83 def setup_flow(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets', cookie
=0):
87 # check if port is specified (vnf:port)
88 if vnf_interface
is None:
89 # take first interface by default
90 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
91 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
92 vnf_interface
= link_dict
[0]['src_port_id']
94 flow_metric
['vnf_name'] = vnf_name
95 flow_metric
['vnf_interface'] = vnf_interface
98 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
99 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
100 for link
in link_dict
:
101 # logging.info("{0},{1}".format(link_dict[link],vnf_interface))
102 if link_dict
[link
]['src_port_id'] == vnf_interface
:
103 # found the right link and connected switch
104 # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
105 vnf_switch
= connected_sw
106 flow_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
110 logging
.exception("vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
))
111 return "vnf switch of {0}:{1} not found!".format(vnf_name
, vnf_interface
)
114 # default port direction to monitor
116 metric
= 'tx_packets'
118 next_node
= self
.net
.getNodeByName(vnf_switch
)
120 if not isinstance(next_node
, OVSSwitch
):
121 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
124 flow_metric
['previous_measurement'] = 0
125 flow_metric
['previous_monitor_time'] = 0
127 flow_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
128 flow_metric
['metric_key'] = metric
129 flow_metric
['cookie'] = cookie
131 self
.monitor_flow_lock
.acquire()
132 self
.flow_metrics
.append(flow_metric
)
133 self
.monitor_flow_lock
.release()
135 logging
.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
136 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
138 except Exception as ex
:
139 logging
.exception("setup_metric error.")
142 def stop_flow(self
, vnf_name
, vnf_interface
=None, metric
=None, cookie
=0):
143 for flow_dict
in self
.flow_metrics
:
144 if flow_dict
['vnf_name'] == vnf_name
and flow_dict
['vnf_interface'] == vnf_interface \
145 and flow_dict
['metric_key'] == metric
and flow_dict
['cookie'] == cookie
:
147 self
.monitor_flow_lock
.acquire()
149 self
.flow_metrics
.remove(flow_dict
)
151 for collector
in self
.registry
._collectors
:
152 if (vnf_name
, vnf_interface
, cookie
) in collector
._metrics
:
153 #logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
154 # collector._metrics))
155 collector
.remove(vnf_name
, vnf_interface
, cookie
)
157 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
159 self
.monitor_flow_lock
.release()
161 logging
.info('Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
))
162 return 'Stopped monitoring flow {3}: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
, cookie
)
165 # first set some parameters, before measurement can start
166 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
170 # check if port is specified (vnf:port)
171 if vnf_interface
is None:
172 # take first interface by default
173 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
174 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
175 vnf_interface
= link_dict
[0]['src_port_id']
177 network_metric
['vnf_name'] = vnf_name
178 network_metric
['vnf_interface'] = vnf_interface
180 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
181 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
182 for link
in link_dict
:
183 # logging.info("{0},{1}".format(link_dict[link],vnf_interface))
184 if link_dict
[link
]['src_port_id'] == vnf_interface
:
185 # found the right link and connected switch
186 # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
187 network_metric
['mon_port'] = link_dict
[link
]['dst_port_nr']
190 if 'mon_port' not in network_metric
:
191 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
192 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
195 # default port direction to monitor
197 metric
= 'tx_packets'
199 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
201 if len(vnf_switch
) > 1:
202 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
204 elif len(vnf_switch
) == 0:
205 logging
.info("vnf: {0} is not connected".format(vnf_name
))
208 vnf_switch
= vnf_switch
[0]
209 next_node
= self
.net
.getNodeByName(vnf_switch
)
211 if not isinstance(next_node
, OVSSwitch
):
212 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
215 network_metric
['previous_measurement'] = 0
216 network_metric
['previous_monitor_time'] = 0
219 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
220 network_metric
['metric_key'] = metric
222 self
.monitor_lock
.acquire()
224 self
.network_metrics
.append(network_metric
)
225 self
.monitor_lock
.release()
228 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
229 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
231 except Exception as ex
:
232 logging
.exception("setup_metric error.")
235 def stop_metric(self
, vnf_name
, vnf_interface
=None, metric
=None):
237 for metric_dict
in self
.network_metrics
:
238 #logging.info('start Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric_dict))
239 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
240 and metric_dict
['metric_key'] == metric
:
242 self
.monitor_lock
.acquire()
244 self
.network_metrics
.remove(metric_dict
)
246 #this removes the complete metric, all labels...
247 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
248 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
250 for collector
in self
.registry
._collectors
:
251 #logging.info('name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, collector._metrics))
253 INFO:root:name:sonemu_rx_count_packets
254 labels:('vnf_name', 'vnf_interface')
255 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
260 logging
.info('{0}'.format(collector
._metrics
.values()))
261 #if self.prom_metrics[metric_dict['metric_key']]
262 if (vnf_name
, vnf_interface
, 'None') in collector
._metrics
:
263 logging
.info('2 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
265 #collector._metrics = {}
266 collector
.remove(vnf_name
, vnf_interface
, 'None')
268 # set values to NaN, prometheus api currently does not support removal of metrics
269 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
271 # this removes the complete metric, all labels...
272 # 1 single monitor job for all metrics of the SDN controller
273 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
274 # we can not specify labels from the metrics to be removed
275 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
276 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
278 self
.monitor_lock
.release()
280 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
281 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
283 # delete everything from this vnf
284 elif metric_dict
['vnf_name'] == vnf_name
and vnf_interface
is None and metric
is None:
285 self
.monitor_lock
.acquire()
286 self
.network_metrics
.remove(metric_dict
)
287 for collector
in self
.registry
._collectors
:
288 collector_dict
= collector
._metrics
.copy()
289 for name
, interface
, id in collector_dict
:
291 logging
.info('3 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
293 collector
.remove(name
, interface
, 'None')
295 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
296 self
.monitor_lock
.release()
297 logging
.info('Stopped monitoring vnf: {0}'.format(vnf_name
))
298 return 'Stopped monitoring: {0}'.format(vnf_name
)
301 # get all metrics defined in the list and export it to Prometheus
302 def get_flow_metrics(self
):
303 while self
.start_monitoring
:
305 self
.monitor_flow_lock
.acquire()
307 for flow_dict
in self
.flow_metrics
:
310 data
['cookie'] = flow_dict
['cookie']
312 if 'tx' in flow_dict
['metric_key']:
313 data
['match'] = {'in_port':flow_dict
['mon_port']}
314 elif 'rx' in flow_dict
['metric_key']:
315 data
['out_port'] = flow_dict
['mon_port']
319 ret
= self
.net
.ryu_REST('stats/flow', dpid
=flow_dict
['switch_dpid'], data
=data
)
320 flow_stat_dict
= ast
.literal_eval(ret
)
322 #logging.info('received flow stat:{0} '.format(flow_stat_dict))
323 self
.set_flow_metric(flow_dict
, flow_stat_dict
)
325 self
.monitor_flow_lock
.release()
328 def get_network_metrics(self
):
329 while self
.start_monitoring
:
331 self
.monitor_lock
.acquire()
333 # group metrics by dpid to optimize the rest api calls
334 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
335 dpid_set
= set(dpid_list
)
337 for dpid
in dpid_set
:
340 ret
= self
.net
.ryu_REST('stats/port', dpid
=dpid
)
341 port_stat_dict
= ast
.literal_eval(ret
)
343 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
344 if int(metric_dict
['switch_dpid'])==int(dpid
)]
345 #logging.info('1set prom packets:{0} '.format(self.network_metrics))
346 for metric_dict
in metric_list
:
347 self
.set_network_metric(metric_dict
, port_stat_dict
)
349 self
.monitor_lock
.release()
352 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
353 def set_network_metric(self
, metric_dict
, port_stat_dict
):
354 # vnf tx is the datacenter switch rx and vice-versa
355 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
356 switch_dpid
= metric_dict
['switch_dpid']
357 vnf_name
= metric_dict
['vnf_name']
358 vnf_interface
= metric_dict
['vnf_interface']
359 previous_measurement
= metric_dict
['previous_measurement']
360 previous_monitor_time
= metric_dict
['previous_monitor_time']
361 mon_port
= metric_dict
['mon_port']
363 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
364 if int(port_stat
['port_no']) == int(mon_port
):
365 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
366 this_measurement
= int(port_stat
[metric_key
])
367 #logging.info('set prom packets:{0} {1}:{2}'.format(this_measurement, vnf_name, vnf_interface))
369 # set prometheus metric
370 self
.prom_metrics
[metric_dict
['metric_key']].\
371 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': None}).\
372 set(this_measurement
)
373 #push_to_gateway(self.pushgateway, job='SDNcontroller',
374 # grouping_key={'metric':metric_dict['metric_key']}, registry=self.registry)
376 # 1 single monitor job for all metrics of the SDN controller
377 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
379 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
380 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
381 metric_dict
['previous_monitor_time'] = port_uptime
382 # do first measurement
383 #logging.info('first measurement')
385 self
.monitor_lock
.release()
387 metric_rate
= self
.get_network_metrics()
391 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
392 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
393 #logging.info('metric: {0} rate:{1}'.format(metric_dict['metric_key'], metric_rate))
395 metric_dict
['previous_measurement'] = this_measurement
396 metric_dict
['previous_monitor_time'] = port_uptime
399 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
400 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
402 def set_flow_metric(self
, metric_dict
, flow_stat_dict
):
403 # vnf tx is the datacenter switch rx and vice-versa
404 #metric_key = self.switch_tx_rx(metric_dict['metric_key'])
405 metric_key
= metric_dict
['metric_key']
406 switch_dpid
= metric_dict
['switch_dpid']
407 vnf_name
= metric_dict
['vnf_name']
408 vnf_interface
= metric_dict
['vnf_interface']
409 previous_measurement
= metric_dict
['previous_measurement']
410 previous_monitor_time
= metric_dict
['previous_monitor_time']
411 cookie
= metric_dict
['cookie']
413 # TODO aggregate all found flow stats
414 flow_stat
= flow_stat_dict
[str(switch_dpid
)][0]
415 if 'bytes' in metric_key
:
416 counter
= flow_stat
['byte_count']
417 elif 'packet' in metric_key
:
418 counter
= flow_stat
['packet_count']
420 flow_uptime
= flow_stat
['duration_sec'] + flow_stat
['duration_nsec'] * 10 ** (-9)
422 self
.prom_metrics
[metric_dict
['metric_key']]. \
423 labels({'vnf_name': vnf_name
, 'vnf_interface': vnf_interface
, 'flow_id': cookie
}). \
425 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
427 #logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))
428 #return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)
430 def query_Prometheus(self
, query
):
433 for old in escaped_chars:
434 new = '\{0}'.format(old)
435 query = query.replace(old, new)
437 url
= self
.prometheus_REST_api
+ '/' + 'api/v1/query?query=' + query
438 #logging.info('query:{0}'.format(url))
439 req
= urllib2
.Request(url
)
440 ret
= urllib2
.urlopen(req
).read()
441 ret
= ast
.literal_eval(ret
)
442 if ret
['status'] == 'success':
443 #logging.info('return:{0}'.format(ret))
445 ret
= ret
['data']['result'][0]['value']
452 def start_Prometheus(self
, port
=9090):
453 # prometheus.yml configuration file is located in the same directory as this file
457 "-p", "{0}:9090".format(port
),
458 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
459 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
460 "--name", "prometheus",
463 logging
.info('Start Prometheus container {0}'.format(cmd
))
466 def start_PushGateway(self
, port
=9091):
470 "-p", "{0}:9091".format(port
),
471 "--name", "pushgateway",
475 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
478 def start_cadvisor(self
, port
=8090):
482 "--volume=/:/rootfs:ro",
483 "--volume=/var/run:/var/run:rw",
484 "--volume=/sys:/sys:ro",
485 "--volume=/var/lib/docker/:/var/lib/docker:ro",
486 "--publish={0}:8080".format(port
),
488 "google/cadvisor:latest"
490 logging
.info('Start cAdvisor container {0}'.format(cmd
))
494 # stop the monitoring thread
495 self
.start_monitoring
= False
496 self
.monitor_thread
.join()
497 self
.monitor_flow_thread
.join()
500 if self.prometheus_process is not None:
501 logging.info('stopping prometheus container')
502 self.prometheus_process.terminate()
503 self.prometheus_process.kill()
504 self._stop_container('prometheus')
506 if self.pushgateway_process is not None:
507 logging.info('stopping pushgateway container')
508 self.pushgateway_process.terminate()
509 self.pushgateway_process.kill()
510 self._stop_container('pushgateway')
513 if self
.cadvisor_process
is not None:
514 logging
.info('stopping cadvisor container')
515 self
.cadvisor_process
.terminate()
516 self
.cadvisor_process
.kill()
517 self
._stop
_container
('cadvisor')
519 def switch_tx_rx(self
,metric
=''):
520 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
521 # so we need to change the metric name to be consistent with the vnf rx or tx
523 metric
= metric
.replace('tx','rx')
525 metric
= metric
.replace('rx','tx')
529 def _stop_container(self
, name
):
540 def profile(self
, mgmt_ip
, rate
, input_ip
, vnf_uuid
):
542 ssh
= paramiko
.SSHClient()
543 ssh
.set_missing_host_key_policy(paramiko
.AutoAddPolicy())
544 #ssh.connect(mgmt_ip, username='steven', password='test')
545 ssh
.connect(mgmt_ip
, username
='root', password
='root')
547 iperf_cmd
= 'iperf -c {0} -u -l18 -b{1}M -t1000 &'.format(input_ip
, rate
)
549 stdin
, stdout
, stderr
= ssh
.exec_command(iperf_cmd
)
551 start_time
= time
.time()
552 query_cpu
= '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid
, 1)
553 while (time
.time() - start_time
) < 15:
554 data
= self
.query_Prometheus(query_cpu
)
555 # logging.info('rate: {1} data:{0}'.format(data, rate))
559 query_cpu2
= '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid
, 8)
560 cpu_load
= float(self
.query_Prometheus(query_cpu2
)[1])
561 output
= 'rate: {1}Mbps; cpu_load: {0}%'.format(round(cpu_load
* 100, 2), rate
)
563 logging
.info(output_line
)
565 stop_iperf
= 'pkill -9 iperf'
566 stdin
, stdout
, stderr
= ssh
.exec_command(stop_iperf
)