SDN chaining now via ryu rest api
[osm/vim-emu.git] / src / emuvim / dcemulator / monitoring.py
1 __author__ = 'Administrator'
2
3 import urllib2
4 import logging
5 from mininet.node import OVSSwitch
6 import ast
7 import time
8 from prometheus_client import start_http_server, Summary, Histogram, Gauge, Counter, REGISTRY, CollectorRegistry, \
9 pushadd_to_gateway, push_to_gateway, delete_from_gateway
10 import threading
11 from subprocess import Popen, PIPE
12 import os
13
14 import paramiko
15 import gevent
16
17 logging.basicConfig(level=logging.INFO)
18
19 """
20 class to read openflow stats from the Ryu controller of the DCNetwork
21 """
22
23 class DCNetworkMonitor():
24 def __init__(self, net):
25 self.net = net
26
27 prometheus_ip = '0.0.0.0'
28 prometheus_port = '9090'
29 self.prometheus_REST_api = 'http://{0}:{1}'.format(prometheus_ip, prometheus_port)
30
31
32
33 # helper variables to calculate the metrics
34 self.pushgateway = 'localhost:9091'
35 # Start up the server to expose the metrics to Prometheus.
36 #start_http_server(8000)
37 # supported Prometheus metrics
38 self.registry = CollectorRegistry()
39 self.prom_tx_packet_count = Gauge('sonemu_tx_count_packets', 'Total number of packets sent',
40 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
41 self.prom_rx_packet_count = Gauge('sonemu_rx_count_packets', 'Total number of packets received',
42 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
43 self.prom_tx_byte_count = Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',
44 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
45 self.prom_rx_byte_count = Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',
46 ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)
47
48 self.prom_metrics={'tx_packets':self.prom_tx_packet_count, 'rx_packets':self.prom_rx_packet_count,
49 'tx_bytes':self.prom_tx_byte_count,'rx_bytes':self.prom_rx_byte_count}
50
51 # list of installed metrics to monitor
52 # each entry can contain this data
53 '''
54 {
55 switch_dpid = 0
56 vnf_name = None
57 vnf_interface = None
58 previous_measurement = 0
59 previous_monitor_time = 0
60 metric_key = None
61 mon_port = None
62 }
63 '''
64 self.monitor_lock = threading.Lock()
65 self.monitor_flow_lock = threading.Lock()
66 self.network_metrics = []
67 self.flow_metrics = []
68
69 # start monitoring thread
70 self.start_monitoring = True
71 self.monitor_thread = threading.Thread(target=self.get_network_metrics)
72 self.monitor_thread.start()
73
74 self.monitor_flow_thread = threading.Thread(target=self.get_flow_metrics)
75 self.monitor_flow_thread.start()
76
77 # helper tools
78 self.pushgateway_process = self.start_PushGateway()
79 self.prometheus_process = self.start_Prometheus()
80 self.cadvisor_process = self.start_cadvisor()
81
82 # first set some parameters, before measurement can start
83 def setup_flow(self, vnf_name, vnf_interface=None, metric='tx_packets', cookie=0):
84
85 flow_metric = {}
86
87 # check if port is specified (vnf:port)
88 if vnf_interface is None:
89 # take first interface by default
90 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
91 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
92 vnf_interface = link_dict[0]['src_port_id']
93
94 flow_metric['vnf_name'] = vnf_name
95 flow_metric['vnf_interface'] = vnf_interface
96
97 vnf_switch = None
98 for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):
99 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
100 for link in link_dict:
101 # logging.info("{0},{1}".format(link_dict[link],vnf_interface))
102 if link_dict[link]['src_port_id'] == vnf_interface:
103 # found the right link and connected switch
104 # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
105 vnf_switch = connected_sw
106 flow_metric['mon_port'] = link_dict[link]['dst_port_nr']
107 break
108
109 if not vnf_switch:
110 logging.exception("vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface))
111 return "vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface)
112
113 try:
114 # default port direction to monitor
115 if metric is None:
116 metric = 'tx_packets'
117
118 next_node = self.net.getNodeByName(vnf_switch)
119
120 if not isinstance(next_node, OVSSwitch):
121 logging.info("vnf: {0} is not connected to switch".format(vnf_name))
122 return
123
124 flow_metric['previous_measurement'] = 0
125 flow_metric['previous_monitor_time'] = 0
126
127 flow_metric['switch_dpid'] = int(str(next_node.dpid), 16)
128 flow_metric['metric_key'] = metric
129 flow_metric['cookie'] = cookie
130
131 self.monitor_flow_lock.acquire()
132 self.flow_metrics.append(flow_metric)
133 self.monitor_flow_lock.release()
134
135 logging.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie))
136 return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie)
137
138 except Exception as ex:
139 logging.exception("setup_metric error.")
140 return ex.message
141
142
143 # first set some parameters, before measurement can start
144 def setup_metric(self, vnf_name, vnf_interface=None, metric='tx_packets'):
145
146 network_metric = {}
147
148 # check if port is specified (vnf:port)
149 if vnf_interface is None:
150 # take first interface by default
151 connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]
152 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
153 vnf_interface = link_dict[0]['src_port_id']
154
155 network_metric['vnf_name'] = vnf_name
156 network_metric['vnf_interface'] = vnf_interface
157
158 for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):
159 link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]
160 for link in link_dict:
161 # logging.info("{0},{1}".format(link_dict[link],vnf_interface))
162 if link_dict[link]['src_port_id'] == vnf_interface:
163 # found the right link and connected switch
164 # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))
165 network_metric['mon_port'] = link_dict[link]['dst_port_nr']
166 break
167
168 if 'mon_port' not in network_metric:
169 logging.exception("vnf interface {0}:{1} not found!".format(vnf_name,vnf_interface))
170 return "vnf interface {0}:{1} not found!".format(vnf_name,vnf_interface)
171
172 try:
173 # default port direction to monitor
174 if metric is None:
175 metric = 'tx_packets'
176
177 vnf_switch = self.net.DCNetwork_graph.neighbors(str(vnf_name))
178
179 if len(vnf_switch) > 1:
180 logging.info("vnf: {0} has multiple ports".format(vnf_name))
181 return
182 elif len(vnf_switch) == 0:
183 logging.info("vnf: {0} is not connected".format(vnf_name))
184 return
185 else:
186 vnf_switch = vnf_switch[0]
187 next_node = self.net.getNodeByName(vnf_switch)
188
189 if not isinstance(next_node, OVSSwitch):
190 logging.info("vnf: {0} is not connected to switch".format(vnf_name))
191 return
192
193 network_metric['previous_measurement'] = 0
194 network_metric['previous_monitor_time'] = 0
195
196
197 network_metric['switch_dpid'] = int(str(next_node.dpid), 16)
198 network_metric['metric_key'] = metric
199
200 self.monitor_lock.acquire()
201
202 self.network_metrics.append(network_metric)
203 self.monitor_lock.release()
204
205
206 logging.info('Started monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))
207 return 'Started monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)
208
209 except Exception as ex:
210 logging.exception("setup_metric error.")
211 return ex.message
212
213 def stop_metric(self, vnf_name, vnf_interface=None, metric=None):
214
215 for metric_dict in self.network_metrics:
216 #logging.info('start Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric_dict))
217 if metric_dict['vnf_name'] == vnf_name and metric_dict['vnf_interface'] == vnf_interface \
218 and metric_dict['metric_key'] == metric:
219
220 self.monitor_lock.acquire()
221
222 self.network_metrics.remove(metric_dict)
223
224 #this removes the complete metric, all labels...
225 #REGISTRY.unregister(self.prom_metrics[metric_dict['metric_key']])
226 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])
227
228 for collector in self.registry._collectors :
229 #logging.info('name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, collector._metrics))
230 """
231 INFO:root:name:sonemu_rx_count_packets
232 labels:('vnf_name', 'vnf_interface')
233 metrics:{(u'tsrc', u'output'): < prometheus_client.core.Gauge
234 object
235 at
236 0x7f353447fd10 >}
237 """
238 logging.info('{0}'.format(collector._metrics.values()))
239 #if self.prom_metrics[metric_dict['metric_key']]
240 if (vnf_name, vnf_interface, 'None') in collector._metrics:
241 logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
242 collector._metrics))
243 #collector._metrics = {}
244 collector.remove(vnf_name, vnf_interface, 'None')
245
246 # set values to NaN, prometheus api currently does not support removal of metrics
247 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))
248
249 # this removes the complete metric, all labels...
250 # 1 single monitor job for all metrics of the SDN controller
251 # we can only remove from the pushgateway grouping keys(labels) which we have defined for the add_to_pushgateway
252 # we can not specify labels from the metrics to be removed
253 # if we need to remove the metrics seperatelty, we need to give them a separate grouping key, and probably a diffferent registry also
254 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
255
256 self.monitor_lock.release()
257
258 logging.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))
259 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)
260
261 # delete everything from this vnf
262 elif metric_dict['vnf_name'] == vnf_name and vnf_interface is None and metric is None:
263 self.monitor_lock.acquire()
264 self.network_metrics.remove(metric_dict)
265 for collector in self.registry._collectors:
266 collector_dict = collector._metrics.copy()
267 for name, interface, id in collector_dict:
268 if name == vnf_name:
269 logging.info('3 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,
270 collector._metrics))
271 collector.remove(name, interface, 'None')
272
273 delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')
274 self.monitor_lock.release()
275 logging.info('Stopped monitoring vnf: {0}'.format(vnf_name))
276 return 'Stopped monitoring: {0}'.format(vnf_name)
277
278
279 # get all metrics defined in the list and export it to Prometheus
280 def get_flow_metrics(self):
281 while self.start_monitoring:
282
283 self.monitor_flow_lock.acquire()
284
285 for flow_dict in self.flow_metrics:
286 data = {}
287
288 data['cookie'] = flow_dict['cookie']
289
290 if 'tx' in flow_dict['metric_key']:
291 data['match'] = {'in_port':flow_dict['mon_port']}
292 elif 'rx' in flow_dict['metric_key']:
293 data['out_port'] = flow_dict['mon_port']
294
295
296 # query Ryu
297 ret = self.net.ryu_REST('stats/flow', dpid=flow_dict['switch_dpid'], data=data)
298 flow_stat_dict = ast.literal_eval(ret)
299
300 logging.info('received flow stat:{0} '.format(flow_stat_dict))
301 self.set_flow_metric(flow_dict, flow_stat_dict)
302
303 self.monitor_flow_lock.release()
304 time.sleep(1)
305
306 def get_network_metrics(self):
307 while self.start_monitoring:
308
309 self.monitor_lock.acquire()
310
311 # group metrics by dpid to optimize the rest api calls
312 dpid_list = [metric_dict['switch_dpid'] for metric_dict in self.network_metrics]
313 dpid_set = set(dpid_list)
314
315 for dpid in dpid_set:
316
317 # query Ryu
318 ret = self.net.ryu_REST('stats/port', dpid=dpid)
319 port_stat_dict = ast.literal_eval(ret)
320
321 metric_list = [metric_dict for metric_dict in self.network_metrics
322 if int(metric_dict['switch_dpid'])==int(dpid)]
323 #logging.info('1set prom packets:{0} '.format(self.network_metrics))
324 for metric_dict in metric_list:
325 self.set_network_metric(metric_dict, port_stat_dict)
326
327 self.monitor_lock.release()
328 time.sleep(1)
329
330 # add metric to the list to export to Prometheus, parse the Ryu port-stats reply
331 def set_network_metric(self, metric_dict, port_stat_dict):
332 # vnf tx is the datacenter switch rx and vice-versa
333 metric_key = self.switch_tx_rx(metric_dict['metric_key'])
334 switch_dpid = metric_dict['switch_dpid']
335 vnf_name = metric_dict['vnf_name']
336 vnf_interface = metric_dict['vnf_interface']
337 previous_measurement = metric_dict['previous_measurement']
338 previous_monitor_time = metric_dict['previous_monitor_time']
339 mon_port = metric_dict['mon_port']
340
341 for port_stat in port_stat_dict[str(switch_dpid)]:
342 if int(port_stat['port_no']) == int(mon_port):
343 port_uptime = port_stat['duration_sec'] + port_stat['duration_nsec'] * 10 ** (-9)
344 this_measurement = int(port_stat[metric_key])
345 #logging.info('set prom packets:{0} {1}:{2}'.format(this_measurement, vnf_name, vnf_interface))
346
347 # set prometheus metric
348 self.prom_metrics[metric_dict['metric_key']].\
349 labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': None}).\
350 set(this_measurement)
351 #push_to_gateway(self.pushgateway, job='SDNcontroller',
352 # grouping_key={'metric':metric_dict['metric_key']}, registry=self.registry)
353
354 # 1 single monitor job for all metrics of the SDN controller
355 pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)
356
357 if previous_monitor_time <= 0 or previous_monitor_time >= port_uptime:
358 metric_dict['previous_measurement'] = int(port_stat[metric_key])
359 metric_dict['previous_monitor_time'] = port_uptime
360 # do first measurement
361 #logging.info('first measurement')
362 time.sleep(1)
363 self.monitor_lock.release()
364
365 metric_rate = self.get_network_metrics()
366 return metric_rate
367
368 else:
369 time_delta = (port_uptime - metric_dict['previous_monitor_time'])
370 metric_rate = (this_measurement - metric_dict['previous_measurement']) / float(time_delta)
371 #logging.info('metric: {0} rate:{1}'.format(metric_dict['metric_key'], metric_rate))
372
373 metric_dict['previous_measurement'] = this_measurement
374 metric_dict['previous_monitor_time'] = port_uptime
375 return metric_rate
376
377 logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))
378 return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)
379
380 def set_flow_metric(self, metric_dict, flow_stat_dict):
381 # vnf tx is the datacenter switch rx and vice-versa
382 #metric_key = self.switch_tx_rx(metric_dict['metric_key'])
383 metric_key = metric_dict['metric_key']
384 switch_dpid = metric_dict['switch_dpid']
385 vnf_name = metric_dict['vnf_name']
386 vnf_interface = metric_dict['vnf_interface']
387 previous_measurement = metric_dict['previous_measurement']
388 previous_monitor_time = metric_dict['previous_monitor_time']
389 cookie = metric_dict['cookie']
390
391 # TODO aggregate all found flow stats
392 flow_stat = flow_stat_dict[str(switch_dpid)][0]
393 if 'bytes' in metric_key:
394 counter = flow_stat['byte_count']
395 elif 'packet' in metric_key:
396 counter = flow_stat['packet_count']
397
398 flow_uptime = flow_stat['duration_sec'] + flow_stat['duration_nsec'] * 10 ** (-9)
399
400 self.prom_metrics[metric_dict['metric_key']]. \
401 labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': cookie}). \
402 set(counter)
403 pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)
404
405 #logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))
406 #return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)
407
408 def query_Prometheus(self, query):
409 '''
410 escaped_chars='{}[]'
411 for old in escaped_chars:
412 new = '\{0}'.format(old)
413 query = query.replace(old, new)
414 '''
415 url = self.prometheus_REST_api + '/' + 'api/v1/query?query=' + query
416 #logging.info('query:{0}'.format(url))
417 req = urllib2.Request(url)
418 ret = urllib2.urlopen(req).read()
419 ret = ast.literal_eval(ret)
420 if ret['status'] == 'success':
421 #logging.info('return:{0}'.format(ret))
422 try:
423 ret = ret['data']['result'][0]['value']
424 except:
425 ret = None
426 else:
427 ret = None
428 return ret
429
430 def start_Prometheus(self, port=9090):
431 # prometheus.yml configuration file is located in the same directory as this file
432 cmd = ["docker",
433 "run",
434 "--rm",
435 "-p", "{0}:9090".format(port),
436 "-v", "{0}/prometheus.yml:/etc/prometheus/prometheus.yml".format(os.path.dirname(os.path.abspath(__file__))),
437 "-v", "{0}/profile.rules:/etc/prometheus/profile.rules".format(os.path.dirname(os.path.abspath(__file__))),
438 "--name", "prometheus",
439 "prom/prometheus"
440 ]
441 logging.info('Start Prometheus container {0}'.format(cmd))
442 return Popen(cmd)
443
444 def start_PushGateway(self, port=9091):
445 cmd = ["docker",
446 "run",
447 "-d",
448 "-p", "{0}:9091".format(port),
449 "--name", "pushgateway",
450 "prom/pushgateway"
451 ]
452
453 logging.info('Start Prometheus Push Gateway container {0}'.format(cmd))
454 return Popen(cmd)
455
456 def start_cadvisor(self, port=8090):
457 cmd = ["docker",
458 "run",
459 "--rm",
460 "--volume=/:/rootfs:ro",
461 "--volume=/var/run:/var/run:rw",
462 "--volume=/sys:/sys:ro",
463 "--volume=/var/lib/docker/:/var/lib/docker:ro",
464 "--publish={0}:8080".format(port),
465 "--name=cadvisor",
466 "google/cadvisor:latest"
467 ]
468 logging.info('Start cAdvisor container {0}'.format(cmd))
469 return Popen(cmd)
470
471 def stop(self):
472 # stop the monitoring thread
473 self.start_monitoring = False
474 self.monitor_thread.join()
475 self.monitor_flow_thread.join()
476
477 if self.prometheus_process is not None:
478 logging.info('stopping prometheus container')
479 self.prometheus_process.terminate()
480 self.prometheus_process.kill()
481 self._stop_container('prometheus')
482
483 if self.pushgateway_process is not None:
484 logging.info('stopping pushgateway container')
485 self.pushgateway_process.terminate()
486 self.pushgateway_process.kill()
487 self._stop_container('pushgateway')
488
489 if self.cadvisor_process is not None:
490 logging.info('stopping cadvisor container')
491 self.cadvisor_process.terminate()
492 self.cadvisor_process.kill()
493 self._stop_container('cadvisor')
494
495 def switch_tx_rx(self,metric=''):
496 # when monitoring vnfs, the tx of the datacenter switch is actually the rx of the vnf
497 # so we need to change the metric name to be consistent with the vnf rx or tx
498 if 'tx' in metric:
499 metric = metric.replace('tx','rx')
500 elif 'rx' in metric:
501 metric = metric.replace('rx','tx')
502
503 return metric
504
505 def _stop_container(self, name):
506 cmd = ["docker",
507 "stop",
508 name]
509 Popen(cmd).wait()
510
511 cmd = ["docker",
512 "rm",
513 name]
514 Popen(cmd).wait()
515
516 def profile(self, mgmt_ip, rate, input_ip, vnf_uuid ):
517
518 ssh = paramiko.SSHClient()
519 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
520 #ssh.connect(mgmt_ip, username='steven', password='test')
521 ssh.connect(mgmt_ip, username='root', password='root')
522
523 iperf_cmd = 'iperf -c {0} -u -l18 -b{1}M -t1000 &'.format(input_ip, rate)
524 if rate > 0:
525 stdin, stdout, stderr = ssh.exec_command(iperf_cmd)
526
527 start_time = time.time()
528 query_cpu = '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid, 1)
529 while (time.time() - start_time) < 15:
530 data = self.query_Prometheus(query_cpu)
531 # logging.info('rate: {1} data:{0}'.format(data, rate))
532 gevent.sleep(0)
533 time.sleep(1)
534
535 query_cpu2 = '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid, 8)
536 cpu_load = float(self.query_Prometheus(query_cpu2)[1])
537 output = 'rate: {1}Mbps; cpu_load: {0}%'.format(round(cpu_load * 100, 2), rate)
538 output_line = output
539 logging.info(output_line)
540
541 stop_iperf = 'pkill -9 iperf'
542 stdin, stdout, stderr = ssh.exec_command(stop_iperf)
543
544 return output_line
545