1f7ff2ed8d3b1121e9cac3e3d7ddc5ee5a43e1af
1 __author__
= 'Administrator'
5 from mininet
.node
import OVSSwitch
8 from prometheus_client
import start_http_server
, Summary
, Histogram
, Gauge
, Counter
11 logging
.basicConfig(level
=logging
.INFO
)
14 class to read openflow stats from the Ryu controller of the DCNetwork
17 class DCNetworkMonitor():
18 def __init__(self
, net
):
20 # link to Ryu REST_API
23 self
.REST_api
= 'http://{0}:{1}'.format(self
.ip
,self
.port
)
25 # helper variables to calculate the metrics
26 # TODO put these in a list to support multiple metrics simultaneously
29 self
.vnf_interface
= None
30 self
.previous_measurement
= 0
31 self
.previous_monitor_time
= 0
32 self
.metric_key
= None
36 # Start up the server to expose the metrics to Prometheus.
37 start_http_server(8000)
38 # supported Prometheus metrics
39 self
.prom_tx_packet_count
= Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
40 ['vnf_name', 'vnf_interface'])
41 self
.prom_rx_packet_count
= Gauge('sonemu_rx_count_packets', 'Total number of packets received',
42 ['vnf_name', 'vnf_interface'])
43 self
.prom_tx_byte_count
= Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
44 ['vnf_name', 'vnf_interface'])
45 self
.prom_rx_byte_count
= Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
46 ['vnf_name', 'vnf_interface'])
48 self
.prom_metrics
={'tx_packets':self
.prom_tx_packet_count
, 'rx_packets':self
.prom_rx_packet_count
,
49 'tx_bytes':self
.prom_tx_byte_count
,'rx_bytes':self
.prom_rx_byte_count
}
51 # list of installed metrics to monitor
52 # each entry can contain this data
58 previous_measurement = 0
59 previous_monitor_time = 0
64 self
.network_metrics
=[]
66 # start monitoring thread
67 self
.monitor_thread
= threading
.Thread(target
=self
.get_network_metrics
)
68 self
.monitor_thread
.start()
71 # first set some parameters, before measurement can start
72 def setup_metric(self
, vnf_name
, vnf_interface
=None, metric
='tx_packets'):
76 # check if port is specified (vnf:port)
77 if vnf_interface
is None:
78 # take first interface by default
79 connected_sw
= self
.net
.DCNetwork_graph
.neighbors(vnf_name
)[0]
80 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
81 vnf_interface
= link_dict
[0]['src_port_id']
83 network_metric
['vnf_name'] = vnf_name
84 network_metric
['vnf_interface'] = vnf_interface
85 #self.vnf_name = vnf_name
86 #self.vnf_interface = vnf_interface
88 for connected_sw
in self
.net
.DCNetwork_graph
.neighbors(vnf_name
):
89 link_dict
= self
.net
.DCNetwork_graph
[vnf_name
][connected_sw
]
90 for link
in link_dict
:
91 # logging.info("{0},{1}".format(link_dict[link],vnf_interface))
92 if link_dict
[link
]['src_port_id'] == vnf_interface
:
93 # found the right link and connected switch
94 # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
95 network_metric
['mon_port'] = link_dict
[link
]['dst_port']
96 # self.mon_port = link_dict[link]['dst_port']
99 if 'mon_port' not in network_metric
:
100 logging
.exception("vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
))
101 return "vnf interface {0}:{1} not found!".format(vnf_name
,vnf_interface
)
104 # default port direction to monitor
106 metric
= 'tx_packets'
108 vnf_switch
= self
.net
.DCNetwork_graph
.neighbors(str(vnf_name
))
110 if len(vnf_switch
) > 1:
111 logging
.info("vnf: {0} has multiple ports".format(vnf_name
))
113 elif len(vnf_switch
) == 0:
114 logging
.info("vnf: {0} is not connected".format(vnf_name
))
117 vnf_switch
= vnf_switch
[0]
118 next_node
= self
.net
.getNodeByName(vnf_switch
)
120 if not isinstance(next_node
, OVSSwitch
):
121 logging
.info("vnf: {0} is not connected to switch".format(vnf_name
))
124 network_metric
['previous_measurement'] = 0
125 network_metric
['previous_monitor_time'] = 0
126 #self.previous_measurement = 0
127 #self.previous_monitor_time = 0
129 network_metric
['switch_dpid'] = int(str(next_node
.dpid
), 16)
130 network_metric
['metric_key'] = metric
131 #self.switch_dpid = int(str(next_node.dpid), 16)
132 #self.metric_key = '{0}_{1}'.format(direction, metric)
134 self
.network_metrics
.append(network_metric
)
136 logging
.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
))
137 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name
, vnf_interface
, metric
)
139 except Exception as ex
:
140 logging
.exception("get_rate error.")
144 # get all metrics defined in the list
145 def get_network_metrics(self
):
147 # group metrics by dpid to optimize the rest api calls
148 dpid_list
= [metric_dict
['switch_dpid'] for metric_dict
in self
.network_metrics
]
149 dpid_set
= set(dpid_list
)
151 for dpid
in dpid_set
:
154 ret
= self
.REST_cmd('stats/port', dpid
)
155 port_stat_dict
= ast
.literal_eval(ret
)
157 metric_list
= [metric_dict
for metric_dict
in self
.network_metrics
158 if int(metric_dict
['switch_dpid'])==int(dpid
)]
159 #logging.info('1set prom packets:{0} '.format(self.network_metrics))
160 for metric_dict
in metric_list
:
161 self
.set_network_metric(metric_dict
, port_stat_dict
)
165 # call this function repeatedly for streaming measurements
166 def set_network_metric(self
, metric_dict
, port_stat_dict
):
168 metric_key
= metric_dict
['metric_key']
169 switch_dpid
= metric_dict
['switch_dpid']
170 vnf_name
= metric_dict
['vnf_name']
171 vnf_interface
= metric_dict
['vnf_interface']
172 previous_measurement
= metric_dict
['previous_measurement']
173 previous_monitor_time
= metric_dict
['previous_monitor_time']
174 mon_port
= metric_dict
['mon_port']
176 for port_stat
in port_stat_dict
[str(switch_dpid
)]:
177 if int(port_stat
['port_no']) == int(mon_port
):
178 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
179 this_measurement
= int(port_stat
[metric_key
])
180 #logging.info('set prom packets:{0} {1}:{2}'.format(this_measurement, vnf_name, vnf_interface))
182 # set prometheus metric
183 self
.prom_metrics
[metric_key
].labels(vnf_name
, vnf_interface
).set(this_measurement
)
185 if previous_monitor_time
<= 0 or previous_monitor_time
>= port_uptime
:
186 metric_dict
['previous_measurement'] = int(port_stat
[metric_key
])
187 metric_dict
['previous_monitor_time'] = port_uptime
188 # do first measurement
189 #logging.info('first measurement')
191 byte_rate
= self
.get_network_metrics()
194 time_delta
= (port_uptime
- metric_dict
['previous_monitor_time'])
195 byte_rate
= (this_measurement
- metric_dict
['previous_measurement']) / float(time_delta
)
196 # logging.info('uptime:{2} delta:{0} rate:{1}'.format(time_delta,byte_rate,port_uptime))
198 metric_dict
['previous_measurement'] = this_measurement
199 metric_dict
['previous_monitor_time'] = port_uptime
202 logging
.exception('metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
))
203 return 'metric {0} not found on {1}:{2}'.format(metric_key
, vnf_name
, vnf_interface
)
206 # call this function repeatedly for streaming measurements
207 def get_rate(self
, vnf_name
, vnf_interface
=None, direction
='tx', metric
='packets'):
209 key
= self
.metric_key
211 ret
= self
.REST_cmd('stats/port', self
.switch_dpid
)
212 port_stat_dict
= ast
.literal_eval(ret
)
213 for port_stat
in port_stat_dict
[str(self
.switch_dpid
)]:
214 if port_stat
['port_no'] == self
.mon_port
:
215 port_uptime
= port_stat
['duration_sec'] + port_stat
['duration_nsec'] * 10 ** (-9)
216 this_measurement
= int(port_stat
[key
])
217 #logging.info('packets:{0}'.format(this_measurement))
219 # set prometheus metrics
220 if metric
== 'packets':
221 self
.prom_tx_packet_count
.labels(self
.vnf_name
, self
.vnf_interface
).set(this_measurement
)
222 elif metric
== 'bytes':
223 self
.prom_tx_byte_count
.labels(self
.vnf_name
, self
.vnf_interface
).set(this_measurement
)
225 if self
.previous_monitor_time
<= 0 or self
.previous_monitor_time
>= port_uptime
:
226 self
.previous_measurement
= int(port_stat
[key
])
227 self
.previous_monitor_time
= port_uptime
228 # do first measurement
230 byte_rate
= self
.get_rate(vnf_name
, vnf_interface
, direction
, metric
)
233 time_delta
= (port_uptime
- self
.previous_monitor_time
)
234 byte_rate
= (this_measurement
- self
.previous_measurement
) / float(time_delta
)
235 #logging.info('uptime:{2} delta:{0} rate:{1}'.format(time_delta,byte_rate,port_uptime))
237 self
.previous_measurement
= this_measurement
238 self
.previous_monitor_time
= port_uptime
243 def REST_cmd(self
, prefix
, dpid
):
244 url
= self
.REST_api
+ '/' + str(prefix
) + '/' + str(dpid
)
245 req
= urllib2
.Request(url
)
246 ret
= urllib2
.urlopen(req
).read()