Merge branch 'master' of github.com:mpeuster/son-emu
[osm/vim-emu.git] / src / emuvim / dcemulator / monitoring.py
1 __author__ = 'Administrator'
2
3 import urllib2
4 import logging
5 from mininet.node import OVSSwitch
6 import ast
7 import time
8 from prometheus_client import start_http_server, Summary, Histogram, Gauge, Counter, REGISTRY, CollectorRegistry, \
9 pushadd_to_gateway, push_to_gateway, delete_from_gateway
10 import threading
11 from subprocess import Popen, PIPE
12 import os
13
14 logging.basicConfig(level=logging.INFO)
15
16 """
17 class to read openflow stats from the Ryu controller of the DCNetwork
18 """
19
20 class DCNetworkMonitor():
21 def __init__(self, net):
22 self.net = net
23 # link to Ryu REST_API
24 self.ip = '0.0.0.0'
25 self.port = '8080'
26 self.REST_api = 'http://{0}:{1}'.format(self.ip,self.port)
27
28 # helper variables to calculate the metrics
29 self.pushgateway = 'localhost:9091'
30 # Start up the server to expose the metrics to Prometheus.
31 #start_http_server(8000)
32 # supported Prometheus metrics
33 self.registry = CollectorRegistry()
34 self.prom_tx_packet_count = Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
35 ['vnf_name', 'vnf_interface'], registry=self.registry)
36 self.prom_rx_packet_count = Gauge('sonemu_rx_count_packets', 'Total number of packets received',
37 ['vnf_name', 'vnf_interface'], registry=self.registry)
38 self.prom_tx_byte_count = Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
39 ['vnf_name', 'vnf_interface'], registry=self.registry)
40 self.prom_rx_byte_count = Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
41 ['vnf_name', 'vnf_interface'], registry=self.registry)
42
43 self.prom_metrics={'tx_packets':self.prom_tx_packet_count, 'rx_packets':self.prom_rx_packet_count,
44 'tx_bytes':self.prom_tx_byte_count,'rx_bytes':self.prom_rx_byte_count}
45
46 # list of installed metrics to monitor
47 # each entry can contain this data
48 '''
49 {
50 switch_dpid = 0
51 vnf_name = None
52 vnf_interface = None
53 previous_measurement = 0
54 previous_monitor_time = 0
55 metric_key = None
56 mon_port = None
57 }
58 '''
59 self.monitor_lock = threading.Lock()
60 self.network_metrics = []
61
62 # start monitoring thread
63 self.start_monitoring = True
64 self.monitor_thread = threading.Thread(target=self.get_network_metrics)
65 self.monitor_thread.start()
66
67 # helper tools
68 self.pushgateway_process = self.start_PushGateway()
69 self.prometheus_process = self.start_Prometheus()
70 self.cadvisor_process = self.start_cadvisor()
71
72 # first set some parameters, before measurement can start
73 def setup_metric(self, vnf_name, vnf_interface=None, metric='tx_packets'):
74
75 network_metric = {}
76
77 # check if port is specified (vnf:port)
78 if vnf_interface is None:
79 # take first interface by default
80 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
81 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
82 vnf_interface = link_dict[0]['src_port_id']
83
84 network_metric['vnf_name'] = vnf_name
85 network_metric['vnf_interface'] = vnf_interface
86
87 for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):
88 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
89 for link in link_dict:
90 # logging.info("{0},{1}".format(link_dict[link],vnf_interface))
91 if link_dict[link]['src_port_id'] == vnf_interface:
92 # found the right link and connected switch
93 # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
94 network_metric['mon_port'] = link_dict[link]['dst_port']
95 break
96
97 if 'mon_port' not in network_metric:
98 logging.exception("vnf interface {0}:{1} not found!".format(vnf_name,vnf_interface))
99 return "vnf interface {0}:{1} not found!".format(vnf_name,vnf_interface)
100
101 try:
102 # default port direction to monitor
103 if metric is None:
104 metric = 'tx_packets'
105
106 vnf_switch = self.net.DCNetwork_graph.neighbors(str(vnf_name))
107
108 if len(vnf_switch) > 1:
109 logging.info("vnf: {0} has multiple ports".format(vnf_name))
110 return
111 elif len(vnf_switch) == 0:
112 logging.info("vnf: {0} is not connected".format(vnf_name))
113 return
114 else:
115 vnf_switch = vnf_switch[0]
116 next_node = self.net.getNodeByName(vnf_switch)
117
118 if not isinstance(next_node, OVSSwitch):
119 logging.info("vnf: {0} is not connected to switch".format(vnf_name))
120 return
121
122 network_metric['previous_measurement'] = 0
123 network_metric['previous_monitor_time'] = 0
124
125
126 network_metric['switch_dpid'] = int(str(next_node.dpid), 16)
127 network_metric['metric_key'] = metric
128
129 self.monitor_lock.acquire()
130
131 self.network_metrics.append(network_metric)
132 self.monitor_lock.release()
133
134
135 logging.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))
136 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)
137
138 except Exception as ex:
139 logging.exception("setup_metric error.")
140 return ex.message
141
142 def stop_metric(self, vnf_name, vnf_interface, metric):
143
144 for metric_dict in self.network_metrics:
145 if metric_dict['vnf_name'] == vnf_name and metric_dict['vnf_interface'] == vnf_interface \
146 and metric_dict['metric_key'] == metric:
147
148 self.monitor_lock.acquire()
149
150 self.network_metrics.remove(metric_dict)
151
152 #this removes the complete metric, all labels...
153 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
154 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
155
156 for collector in self.registry._collectors :
157 logging.info('name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, collector._metrics))
158 """
159 INFO:root:name:sonemu_rx_count_packets
160 labels:('vnf_name', 'vnf_interface')
161 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
162 object
163 at
164 0x7f353447fd10 >}
165 """
166 logging.info('{0}'.format(collector._metrics.values()))
167 #if self.prom_metrics[metric_dict['metric_key']]
168 if (vnf_name, vnf_interface) in collector._metrics:
169 logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
170 collector._metrics))
171 #collector._metrics = {}
172 collector.remove(vnf_name, vnf_interface)
173
174 # set values to NaN, prometheus api currently does not support removal of metrics
175 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
176
177 # this removes the complete metric, all labels...
178 # 1 single monitor job for all metrics of the SDN controller
179 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
180 # we can not specify labels from the metrics to be removed
181 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
182 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
183
184 self.monitor_lock.release()
185
186 logging.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))
187 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)
188
189
190 # get all metrics defined in the list and export it to Prometheus
191 def get_network_metrics(self):
192 while self.start_monitoring:
193
194 self.monitor_lock.acquire()
195
196 # group metrics by dpid to optimize the rest api calls
197 dpid_list = [metric_dict['switch_dpid'] for metric_dict in self.network_metrics]
198 dpid_set = set(dpid_list)
199
200 for dpid in dpid_set:
201
202 # query Ryu
203 ret = self.REST_cmd('stats/port', dpid)
204 port_stat_dict = ast.literal_eval(ret)
205
206 metric_list = [metric_dict for metric_dict in self.network_metrics
207 if int(metric_dict['switch_dpid'])==int(dpid)]
208 #logging.info('1set prom packets:{0} '.format(self.network_metrics))
209 for metric_dict in metric_list:
210 self.set_network_metric(metric_dict, port_stat_dict)
211
212 self.monitor_lock.release()
213 time.sleep(1)
214
215 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
216 def set_network_metric(self, metric_dict, port_stat_dict):
217 # vnf tx is the datacenter switch rx and vice-versa
218 metric_key = self.switch_tx_rx(metric_dict['metric_key'])
219 switch_dpid = metric_dict['switch_dpid']
220 vnf_name = metric_dict['vnf_name']
221 vnf_interface = metric_dict['vnf_interface']
222 previous_measurement = metric_dict['previous_measurement']
223 previous_monitor_time = metric_dict['previous_monitor_time']
224 mon_port = metric_dict['mon_port']
225
226 for port_stat in port_stat_dict[str(switch_dpid)]:
227 if int(port_stat['port_no']) == int(mon_port):
228 port_uptime = port_stat['duration_sec'] + port_stat['duration_nsec'] * 10 ** (-9)
229 this_measurement = int(port_stat[metric_key])
230 #logging.info('set prom packets:{0} {1}:{2}'.format(this_measurement, vnf_name, vnf_interface))
231
232 # set prometheus metric
233 self.prom_metrics[metric_dict['metric_key']].\
234 labels({'vnf_name':vnf_name, 'vnf_interface':vnf_interface}).\
235 set(this_measurement)
236 #push_to_gateway(self.pushgateway, job='SDNcontroller',
237 # grouping_key={'metric':metric_dict['metric_key']}, registry=self.registry)
238
239 # 1 single monitor job for all metrics of the SDN controller
240 pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)
241
242 if previous_monitor_time <= 0 or previous_monitor_time >= port_uptime:
243 metric_dict['previous_measurement'] = int(port_stat[metric_key])
244 metric_dict['previous_monitor_time'] = port_uptime
245 # do first measurement
246 #logging.info('first measurement')
247 time.sleep(1)
248 self.monitor_lock.release()
249 metric_rate = self.get_network_metrics()
250 return metric_rate
251 else:
252 time_delta = (port_uptime - metric_dict['previous_monitor_time'])
253 metric_rate = (this_measurement - metric_dict['previous_measurement']) / float(time_delta)
254 # logging.info('uptime:{2} delta:{0} rate:{1}'.format(time_delta,byte_rate,port_uptime))
255
256 metric_dict['previous_measurement'] = this_measurement
257 metric_dict['previous_monitor_time'] = port_uptime
258 return metric_rate
259
260 logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))
261 return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)
262
263
264 def REST_cmd(self, prefix, dpid):
265 url = self.REST_api + '/' + str(prefix) + '/' + str(dpid)
266 req = urllib2.Request(url)
267 ret = urllib2.urlopen(req).read()
268 return ret
269
270 def start_Prometheus(self, port=9090):
271 # prometheus.yml configuration file is located in the same directory as this file
272 cmd = ["docker",
273 "run",
274 "--rm",
275 "-p", "{0}:9090".format(port),
276 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os.path.dirname(os.path.abspath(__file__))),
277 "--name", "prometheus",
278 "prom/prometheus"
279 ]
280 logging.info('Start Prometheus container {0}'.format(cmd))
281 return Popen(cmd)
282
283 def start_PushGateway(self, port=9091):
284 cmd = ["docker",
285 "run",
286 "-d",
287 "-p", "{0}:9091".format(port),
288 "--name", "pushgateway",
289 "prom/pushgateway"
290 ]
291
292 logging.info('Start Prometheus Push Gateway container {0}'.format(cmd))
293 return Popen(cmd)
294
295 def start_cadvisor(self, port=8090):
296 cmd = ["docker",
297 "run",
298 "--rm",
299 "--volume=/:/rootfs:ro",
300 "--volume=/var/run:/var/run:rw",
301 "--volume=/sys:/sys:ro",
302 "--volume=/var/lib/docker/:/var/lib/docker:ro",
303 "--publish={0}:8080".format(port),
304 "--name=cadvisor",
305 "google/cadvisor:latest"
306 ]
307 logging.info('Start cAdvisor container {0}'.format(cmd))
308 return Popen(cmd)
309
310 def stop(self):
311 # stop the monitoring thread
312 self.start_monitoring = False
313 self.monitor_thread.join()
314
315 if self.prometheus_process is not None:
316 logging.info('stopping prometheus container')
317 self.prometheus_process.terminate()
318 self.prometheus_process.kill()
319 self._stop_container('prometheus')
320
321 if self.pushgateway_process is not None:
322 logging.info('stopping pushgateway container')
323 self.pushgateway_process.terminate()
324 self.pushgateway_process.kill()
325 self._stop_container('pushgateway')
326
327 if self.cadvisor_process is not None:
328 logging.info('stopping cadvisor container')
329 self.cadvisor_process.terminate()
330 self.cadvisor_process.kill()
331 self._stop_container('cadvisor')
332
333 def switch_tx_rx(self,metric=''):
334 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
335 # so we need to change the metric name to be consistent with the vnf rx or tx
336 if 'tx' in metric:
337 metric = metric.replace('tx','rx')
338 elif 'rx' in metric:
339 metric = metric.replace('rx','tx')
340
341 return metric
342
343 def _stop_container(self, name):
344 cmd = ["docker",
345 "stop",
346 name]
347 Popen(cmd).wait()
348
349 cmd = ["docker",
350 "rm",
351 name]
352 Popen(cmd).wait()
353
354