54e79869de4c5305f6fafd6a74cdd1956b39cd15
1 __author__
= 'Administrator'
5 from mininet
.node
import OVSSwitch
8 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
, REGISTRY
, CollectorRegistry
, \
9 pushadd_to_gateway
, push_to_gateway
, delete_from_gateway
11 from subprocess
import Popen
, PIPE
14 logging
.basicConfig(level
=logging
.INFO
)
17 class to read openflow stats from the Ryu controller of the DCNetwork
20 class DCNetworkMonitor():
21 def __init__(self
, net
):
23 # link to Ryu REST_API
26 self
.REST_api
= 'http://{0}:{1}'.format(self
.ip
,self
.port
)
28 # helper variables to calculate the metrics
29 self
.pushgateway
= 'localhost:9091'
30 # Start up the server to expose the metrics to Prometheus.
31 #start_http_server(8000)
32 # supported Prometheus metrics
33 self
.registry
= CollectorRegistry()
34 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
35 ['vnf_name', 'vnf_interface'], registry
=self
.registry
)
36 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
37 ['vnf_name', 'vnf_interface'], registry
=self
.registry
)
38 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
39 ['vnf_name', 'vnf_interface'], registry
=self
.registry
)
40 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
41 ['vnf_name', 'vnf_interface'], registry
=self
.registry
)
43 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
44 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
46 # list of installed metrics to monitor
47 # each entry can contain this data
53 previous_measurement = 0
54 previous_monitor_time = 0
59 self
.monitor_lock
= threading
.Lock()
60 self
.network_metrics
= []
62 # start monitoring thread
63 self
.start_monitoring
= True
64 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
65 self
.monitor_thread
.start()
68 self
.pushgateway_process
= self
.start_PushGateway()
69 self
.prometheus_process
= self
.start_Prometheus()
70 self
.cadvisor_process
= self
.start_cadvisor()
72 # first set some parameters, before measurement can start
73 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
77 # check if port is specified (vnf:port)
78 if vnf_interface
is None:
79 # take first interface by default
80 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
81 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
82 vnf_interface
= link_dict
[0]['src_port_id']
84 network_metric
['vnf_name'] = vnf_name
85 network_metric
['vnf_interface'] = vnf_interface
87 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
88 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
89 for link
in link_dict
:
90 # logging.info("{0},{1}".format(link_dict[link],vnf_interface))
91 if link_dict
[link
]['src_port_id'] == vnf_interface
:
92 # found the right link and connected switch
93 # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
94 network_metric
['mon_port'] = link_dict
[link
]['dst_port']
97 if 'mon_port' not in network_metric
:
98 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
99 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
102 # default port direction to monitor
104 metric
= 'tx_packets'
106 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
108 if len(vnf_switch
) > 1:
109 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
111 elif len(vnf_switch
) == 0:
112 logging
.info("vnf: {0} is not connected".format(vnf_name
))
115 vnf_switch
= vnf_switch
[0]
116 next_node
= self
.net
.getNodeByName(vnf_switch
)
118 if not isinstance(next_node
, OVSSwitch
):
119 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
122 network_metric
['previous_measurement'] = 0
123 network_metric
['previous_monitor_time'] = 0
126 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
127 network_metric
['metric_key'] = metric
129 self
.monitor_lock
.acquire()
131 self
.network_metrics
.append(network_metric
)
132 self
.monitor_lock
.release()
135 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
136 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
138 except Exception as ex
:
139 logging
.exception("setup_metric error.")
142 def stop_metric(self
, vnf_name
, vnf_interface
, metric
):
144 for metric_dict
in self
.network_metrics
:
145 if metric_dict
['vnf_name'] == vnf_name
and metric_dict
['vnf_interface'] == vnf_interface \
146 and metric_dict
['metric_key'] == metric
:
148 self
.monitor_lock
.acquire()
150 self
.network_metrics
.remove(metric_dict
)
152 #this removes the complete metric, all labels...
153 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
154 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
156 for collector
in self
.registry
._collectors
:
157 logging
.info('name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
, collector
._metrics
))
159 INFO:root:name:sonemu_rx_count_packets
160 labels:('vnf_name', 'vnf_interface')
161 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
166 logging
.info('{0}'.format(collector
._metrics
.values()))
167 #if self.prom_metrics[metric_dict['metric_key']]
168 if (vnf_name
, vnf_interface
) in collector
._metrics
:
169 logging
.info('2 name:{0} labels:{1} metrics:{2}'.format(collector
._name
, collector
._labelnames
,
171 #collector._metrics = {}
172 collector
.remove(vnf_name
, vnf_interface
)
174 # set values to NaN, prometheus api currently does not support removal of metrics
175 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
177 # this removes the complete metric, all labels...
178 # 1 single monitor job for all metrics of the SDN controller
179 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
180 # we can not specify labels from the metrics to be removed
181 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
182 delete_from_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller')
184 self
.monitor_lock
.release()
186 logging
.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
187 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
190 # get all metrics defined in the list and export it to Prometheus
191 def get_network_metrics(self
):
192 while self
.start_monitoring
:
194 self
.monitor_lock
.acquire()
196 # group metrics by dpid to optimize the rest api calls
197 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
198 dpid_set
= set(dpid_list
)
200 for dpid
in dpid_set
:
203 ret
= self
.REST_cmd('stats/port', dpid
)
204 port_stat_dict
= ast
.literal_eval(ret
)
206 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
207 if int(metric_dict
['switch_dpid'])==int(dpid
)]
208 #logging.info('1set prom packets:{0} '.format(self.network_metrics))
209 for metric_dict
in metric_list
:
210 self
.set_network_metric(metric_dict
, port_stat_dict
)
212 self
.monitor_lock
.release()
215 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
216 def set_network_metric(self
, metric_dict
, port_stat_dict
):
217 # vnf tx is the datacenter switch rx and vice-versa
218 metric_key
= self
.switch_tx_rx(metric_dict
['metric_key'])
219 switch_dpid
= metric_dict
['switch_dpid']
220 vnf_name
= metric_dict
['vnf_name']
221 vnf_interface
= metric_dict
['vnf_interface']
222 previous_measurement
= metric_dict
['previous_measurement']
223 previous_monitor_time
= metric_dict
['previous_monitor_time']
224 mon_port
= metric_dict
['mon_port']
226 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
227 if int(port_stat
['port_no']) == int(mon_port
):
228 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
229 this_measurement
= int(port_stat
[metric_key
])
230 #logging.info('set prom packets:{0} {1}:{2}'.format(this_measurement, vnf_name, vnf_interface))
232 # set prometheus metric
233 self
.prom_metrics
[metric_dict
['metric_key']].\
234 labels({'vnf_name':vnf_name
, 'vnf_interface':vnf_interface
}).\
235 set(this_measurement
)
236 #push_to_gateway(self.pushgateway, job='SDNcontroller',
237 # grouping_key={'metric':metric_dict['metric_key']}, registry=self.registry)
239 # 1 single monitor job for all metrics of the SDN controller
240 pushadd_to_gateway(self
.pushgateway
, job
='sonemu-SDNcontroller', registry
=self
.registry
)
242 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
243 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
244 metric_dict
['previous_monitor_time'] = port_uptime
245 # do first measurement
246 #logging.info('first measurement')
248 self
.monitor_lock
.release()
249 metric_rate
= self
.get_network_metrics()
252 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
253 metric_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
254 # logging.info('uptime:{2} delta:{0} rate:{1}'.format(time_delta,byte_rate,port_uptime))
256 metric_dict
['previous_measurement'] = this_measurement
257 metric_dict
['previous_monitor_time'] = port_uptime
260 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
261 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
264 def REST_cmd(self
, prefix
, dpid
):
265 url
= self
.REST_api
+ '/' + str(prefix
) + '/' + str(dpid
)
266 req
= urllib2
.Request(url
)
267 ret
= urllib2
.urlopen(req
).read()
270 def start_Prometheus(self
, port
=9090):
271 # prometheus.yml configuration file is located in the same directory as this file
275 "-p", "{0}:9090".format(port
),
276 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os
.path
.dirname(os
.path
.abspath(__file__
))),
277 "--name", "prometheus",
280 logging
.info('Start Prometheus container {0}'.format(cmd
))
283 def start_PushGateway(self
, port
=9091):
287 "-p", "{0}:9091".format(port
),
288 "--name", "pushgateway",
292 logging
.info('Start Prometheus Push Gateway container {0}'.format(cmd
))
295 def start_cadvisor(self
, port
=8090):
299 "--volume=/:/rootfs:ro",
300 "--volume=/var/run:/var/run:rw",
301 "--volume=/sys:/sys:ro",
302 "--volume=/var/lib/docker/:/var/lib/docker:ro",
303 "--publish={0}:8080".format(port
),
305 "google/cadvisor:latest"
307 logging
.info('Start cAdvisor container {0}'.format(cmd
))
311 # stop the monitoring thread
312 self
.start_monitoring
= False
313 self
.monitor_thread
.join()
315 if self
.prometheus_process
is not None:
316 logging
.info('stopping prometheus container')
317 self
.prometheus_process
.terminate()
318 self
.prometheus_process
.kill()
319 self
._stop
_container
('prometheus')
321 if self
.pushgateway_process
is not None:
322 logging
.info('stopping pushgateway container')
323 self
.pushgateway_process
.terminate()
324 self
.pushgateway_process
.kill()
325 self
._stop
_container
('pushgateway')
327 if self
.cadvisor_process
is not None:
328 logging
.info('stopping cadvisor container')
329 self
.cadvisor_process
.terminate()
330 self
.cadvisor_process
.kill()
331 self
._stop
_container
('cadvisor')
333 def switch_tx_rx(self
,metric
=''):
334 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
335 # so we need to change the metric name to be consistent with the vnf rx or tx
337 metric
= metric
.replace('tx','rx')
339 metric
= metric
.replace('rx','tx')
343 def _stop_container(self
, name
):