* [flask_restful](https://pypi.python.org/pypi/Flask-RESTful) >= 0.3 (BSD)
* [networkx](https://pypi.python.org/pypi/networkx/) >= 1.11 (BSD)
* [oslo.config](http://docs.openstack.org/developer/oslo.config/) >= 3.9.0 (Apache 2.0)
-* [paramiko](https://pypi.python.org/pypi/paramiko/1.16.0) >= 1.6 (LGPL)
* [prometheus_client](https://pypi.python.org/pypi/prometheus_client) >= 0.0.13 (Apache 2.0)
* [pyaml](https://pypi.python.org/pypi/pyaml) >=15.8.2 (WTFPL)
* [pytest-runner](https://pypi.python.org/pypi/pytest-runner) >= 2.8 (MIT)
- name: install prometheus_client
pip: name=prometheus_client state=latest
- - name: install paramiko
- pip: name=paramiko state=latest
-
- name: install latest urllib3 (fix error urllib3.connection.match_hostname = match_hostname)
pip: name=urllib3 state=latest
'docker-py==1.7.1',
'requests',
'prometheus_client',
- 'paramiko',
'urllib3'
],
zip_safe=False,
acknowledge the contributions of their colleagues of the SONATA
partner consortium (www.sonata-nfv.eu).
"""
+
+"""
+Distributed Cloud Emulator (dcemulator)
+Networking and monitoring functions
+(c) 2015 by Steven Van Rossem <steven.vanrossem@intec.ugent.be>
+"""
+
import logging
from flask_restful import Resource
from flask import request
acknowledge the contributions of their colleagues of the SONATA
partner consortium (www.sonata-nfv.eu).
"""
+
+"""
+Distributed Cloud Emulator (dcemulator)
+Networking and monitoring functions
+(c) 2015 by Steven Van Rossem <steven.vanrossem@intec.ugent.be>
+"""
+
import logging
from flask_restful import Resource
from flask import request
import threading
import zerorpc
-import paramiko
-import ipaddress
-import time
-import gevent
-
logging.basicConfig(level=logging.INFO)
logging.exception("RPC error.")
return ex.message
- @zerorpc.stream
- def compute_profile(self, dc_label, compute_name, kwargs):
- # note: zerorpc does not support keyword arguments
-
- ## VIM/dummy gatekeeper's tasks:
- # start vnf
- vnf_status = self.compute_action_start( dc_label, compute_name,
- kwargs.get('image'),
- kwargs.get('network'),
- kwargs.get('command'))
- # start traffic source (with fixed ip addres, no use for now...)
- psrc_status = self.compute_action_start( dc_label, 'psrc', 'profile_source', [{'id':'output'}], None)
- # start traffic sink (with fixed ip addres)
- psink_status = self.compute_action_start(dc_label, 'psink', 'profile_sink', [{'id': 'input'}], None)
- # link vnf to traffic source
- DCNetwork = self.dcs.get(dc_label).net
- DCNetwork.setChain('psrc', compute_name,
- vnf_src_interface='output',
- vnf_dst_interface=kwargs.get('input'),
- cmd='add-flow', weight=None, bidirectional=True)
- DCNetwork.setChain('psrc', compute_name,
- vnf_src_interface='output',
- vnf_dst_interface=kwargs.get('input'),
- cmd='add-flow', weight=None,
- match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
- cookie=10)
- DCNetwork.setChain( compute_name, 'psink',
- vnf_src_interface='output',
- vnf_dst_interface=kwargs.get('input'),
- cmd='add-flow', weight=None, bidirectional=True)
- DCNetwork.setChain(compute_name, 'psink',
- vnf_src_interface='output',
- vnf_dst_interface=kwargs.get('input'),
- cmd='add-flow', weight=None,
- match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
- cookie=11)
-
- ## SSM/SP tasks:
- # start traffic generation
- '''
- for nw in psrc_status.get('network'):
- if nw.get('intf_name') == 'output':
- psrc_output_ip = unicode(nw['ip'])
- break
- dummy_iperf_server_ip = ipaddress.IPv4Address(psrc_output_ip) + 1
- '''
- for nw in psink_status.get('network'):
- if nw.get('intf_name') == 'input':
- psink_input_ip = nw['ip']
- break
-
-
- # get monitor data and analyze
- vnf_uuid = vnf_status['id']
- psrc_mgmt_ip = psrc_status['docker_network']
-
- # query rate
-
- #need to wait a bit before containers are fully up?
- time.sleep(2)
-
- def generate():
- for rate in [0, 1, 2, 3]:
- #logging.info('query:{0}'.format(query_cpu))
-
- output_line = DCNetwork.monitor_agent.profile(psrc_mgmt_ip, rate, psink_input_ip, vnf_uuid)
- gevent.sleep(0)
- yield output_line
-
- # query loss
-
-
- # create table
-
- ## VIM/dummy gatekeeper's tasks:
- # remove vnfs and chain
- DCNetwork.setChain('psrc', compute_name,
- vnf_src_interface='output',
- vnf_dst_interface=kwargs.get('input'),
- cmd='del-flows', weight=None, bidirectional=True)
- DCNetwork.setChain('psrc', compute_name,
- vnf_src_interface='output',
- vnf_dst_interface=kwargs.get('input'),
- cmd='del-flows', weight=None,
- match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
- cookie=10)
- DCNetwork.setChain(compute_name, 'psink',
- vnf_src_interface='output',
- vnf_dst_interface=kwargs.get('input'),
- cmd='del-flows', weight=None, bidirectional=True)
- DCNetwork.setChain(compute_name, 'psink',
- vnf_src_interface='output',
- vnf_dst_interface=kwargs.get('input'),
- cmd='del-flows', weight=None,
- match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
- cookie=11)
- self.compute_action_stop(dc_label, compute_name)
- self.compute_action_stop(dc_label, 'psink')
- self.compute_action_stop(dc_label, 'psrc')
-
- return generate()
-
def datacenter_list(self):
logging.debug("RPC CALL: datacenter list")
try:
"""
"""
Distributed Cloud Emulator (dcemulator)
+Networking and monitoring functions
+(c) 2015 by Steven Van Rossem <steven.vanrossem@intec.ugent.be>
"""
import logging
args.get("datacenter"), args.get("name"))
pp.pprint(r)
- def profile(self, args):
- nw_list = list()
- if args.get("network") is not None:
- nw_list = self._parse_network(args.get("network"))
-
- params = self._create_dict(
- network=nw_list,
- command=args.get("docker_command"),
- image=args.get("image"),
- input=args.get("input"),
- output=args.get("output"))
-
- for output in self.c.compute_profile(
- args.get("datacenter"),
- args.get("name"),
- params):
- print(output + '\n')
-
- #pp.pprint(r)
- #print(r)
-
def _create_dict(self, **kwargs):
return kwargs
parser = argparse.ArgumentParser(description='son-emu compute')
parser.add_argument(
"command",
- choices=['start', 'stop', 'list', 'status', 'profile'],
+ choices=['start', 'stop', 'list', 'status'],
help="Action to be executed.")
parser.add_argument(
"--datacenter", "-d", dest="datacenter",
"--net", dest="network",
help="Network properties of a compute instance e.g. \
'(id=input,ip=10.0.10.3/24),(id=output,ip=10.0.10.4/24)' for multiple interfaces.")
-parser.add_argument(
- "--input", "-in", dest="input",
- help="input interface of the vnf to profile")
-parser.add_argument(
- "--output", "-out", dest="output",
- help="output interface of the vnf to profile")
-
def main(argv):
args = vars(parser.parse_args(argv))
args.get("cookie"))\r
pp.pprint(r)\r
\r
- def prometheus_zrpc(self, args):\r
- vnf_name = self._parse_vnf_name(args.get("vnf_name"))\r
- vnf_interface = self._parse_vnf_interface(args.get("vnf_name"))\r
- r = self.c.prometheus(\r
- args.get("datacenter"),\r
- vnf_name,\r
- vnf_interface,\r
- args.get("query"))\r
- pp.pprint(r)\r
-\r
def prometheus(self, args):\r
+ # This functions makes it more user-friendly to create the correct prometheus query\r
+ # <uuid> is replaced by the correct uuid of the deployed vnf container\r
vnf_name = self._parse_vnf_name(args.get("vnf_name"))\r
vnf_interface = self._parse_vnf_interface(args.get("vnf_name"))\r
dc_label = args.get("datacenter")\r
partner consortium (www.sonata-nfv.eu).
"""
-#import urllib2
+
import requests
-#import ast
+
# set this to localhost for now
# this is correct for son-emu started outside of a container or as a container with net=host
-#TODO prometheus sdk DB is started outside of emulator, place these globals in an external SDK config file?
-prometheus_ip = '127.0.0.1'
+#TODO if prometheus sdk DB is started outside of emulator, place these globals in an external SDK config file?
+prometheus_ip = 'localhost'
+# when sdk is started with docker-compose, we could use
+# prometheus_ip = 'prometheus'
prometheus_port = '9090'
prometheus_REST_api = 'http://{0}:{1}'.format(prometheus_ip, prometheus_port)
def query_Prometheus(query):
url = prometheus_REST_api + '/' + 'api/v1/query?query=' + query
# logging.info('query:{0}'.format(url))
- #req = urllib2.Request(url)
req = requests.get(url)
- #ret = urllib2.urlopen(req).read()
- #ret = ast.literal_eval(ret)
ret = req.json()
if ret['status'] == 'success':
# logging.info('return:{0}'.format(ret))
pp.pprint(response.json())
+ def prometheus(self, args):
+ # This functions makes it more user-friendly to create the correct prometheus query
+ # <uuid> is replaced by the correct uuid of the deployed vnf container
+ vnf_name = self._parse_vnf_name(args.get("vnf_name"))
+ vnf_interface = self._parse_vnf_interface(args.get("vnf_name"))
+ dc_label = args.get("datacenter")
+ query = args.get("query")
+ vnf_status = get("%s/restapi/compute/%s/%s" %
+ (args.get("endpoint"),
+ args.get("datacenter"),
+ vnf_name)).json()
+ uuid = vnf_status['id']
+ query = query.replace('<uuid>', uuid)
+
+ response = prometheus.query_Prometheus(query)
+ pp.pprint(response)
+
def _parse_vnf_name(self, vnf_name_str):
vnf_name = vnf_name_str.split(':')[0]
return vnf_name
partner consortium (www.sonata-nfv.eu).\r
"""\r
\r
-import urllib2\r
import logging\r
from mininet.node import OVSSwitch\r
import ast\r
from prometheus_client import start_http_server, Summary, Histogram, Gauge, Counter, REGISTRY, CollectorRegistry, \\r
pushadd_to_gateway, push_to_gateway, delete_from_gateway\r
import threading\r
-from subprocess import Popen, PIPE\r
+from subprocess import Popen\r
import os\r
\r
-import paramiko\r
-import gevent\r
\r
logging.basicConfig(level=logging.INFO)\r
\r
def __init__(self, net):\r
self.net = net\r
\r
+ # TODO: these global variables should be part of a config file?\r
+ '''\r
+ # prometheus is started outside of son-emu\r
prometheus_ip = '127.0.0.1'\r
prometheus_port = '9090'\r
self.prometheus_REST_api = 'http://{0}:{1}'.format(prometheus_ip, prometheus_port)\r
-\r
-\r
-\r
+ '''\r
# helper variables to calculate the metrics\r
+ # pushgateway is started outside of son-emu and son-emu is started with net=host\r
+ # so localhost:9091 works\r
self.pushgateway = 'localhost:9091'\r
- # Start up the server to expose the metrics to Prometheus.\r
+ # when sdk is started with docker-compose, we could use\r
+ # self.pushgateway = 'pushgateway:9091'\r
+ # Start up the server to expose the metrics to Prometheus\r
#start_http_server(8000)\r
+\r
# supported Prometheus metrics\r
self.registry = CollectorRegistry()\r
self.prom_tx_packet_count = Gauge('sonemu_tx_count_packets', 'Total number of packets sent',\r
self.monitor_flow_thread.start()\r
\r
# helper tools\r
+ # Prometheus pushgateway and DB are started as external contianer, outside of son-emu\r
#self.pushgateway_process = self.start_PushGateway()\r
#self.prometheus_process = self.start_Prometheus()\r
self.cadvisor_process = self.start_cadvisor()\r
for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):\r
link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]\r
for link in link_dict:\r
- # logging.info("{0},{1}".format(link_dict[link],vnf_interface))\r
if link_dict[link]['src_port_id'] == vnf_interface:\r
# found the right link and connected switch\r
- # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))\r
vnf_switch = connected_sw\r
flow_metric['mon_port'] = link_dict[link]['dst_port_nr']\r
break\r
\r
for collector in self.registry._collectors:\r
if (vnf_name, vnf_interface, cookie) in collector._metrics:\r
- #logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,\r
- # collector._metrics))\r
collector.remove(vnf_name, vnf_interface, cookie)\r
\r
delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')\r
for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):\r
link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]\r
for link in link_dict:\r
- # logging.info("{0},{1}".format(link_dict[link],vnf_interface))\r
if link_dict[link]['src_port_id'] == vnf_interface:\r
# found the right link and connected switch\r
- # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))\r
network_metric['mon_port'] = link_dict[link]['dst_port_nr']\r
break\r
\r
def stop_metric(self, vnf_name, vnf_interface=None, metric=None):\r
\r
for metric_dict in self.network_metrics:\r
- #logging.info('start Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric_dict))\r
if metric_dict['vnf_name'] == vnf_name and metric_dict['vnf_interface'] == vnf_interface \\r
and metric_dict['metric_key'] == metric:\r
\r
#self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])\r
\r
for collector in self.registry._collectors :\r
- #logging.info('name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, collector._metrics))\r
+\r
"""\r
INFO:root:name:sonemu_rx_count_packets\r
labels:('vnf_name', 'vnf_interface')\r
0x7f353447fd10 >}\r
"""\r
logging.info('{0}'.format(collector._metrics.values()))\r
- #if self.prom_metrics[metric_dict['metric_key']]\r
+\r
if (vnf_name, vnf_interface, 'None') in collector._metrics:\r
logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,\r
collector._metrics))\r
- #collector._metrics = {}\r
collector.remove(vnf_name, vnf_interface, 'None')\r
\r
# set values to NaN, prometheus api currently does not support removal of metrics\r
ret = self.net.ryu_REST('stats/flow', dpid=flow_dict['switch_dpid'], data=data)\r
flow_stat_dict = ast.literal_eval(ret)\r
\r
- #logging.info('received flow stat:{0} '.format(flow_stat_dict))\r
+ logging.debug('received flow stat:{0} '.format(flow_stat_dict))\r
self.set_flow_metric(flow_dict, flow_stat_dict)\r
\r
self.monitor_flow_lock.release()\r
\r
metric_list = [metric_dict for metric_dict in self.network_metrics\r
if int(metric_dict['switch_dpid'])==int(dpid)]\r
- #logging.info('1set prom packets:{0} '.format(self.network_metrics))\r
+\r
for metric_dict in metric_list:\r
self.set_network_metric(metric_dict, port_stat_dict)\r
\r
if int(port_stat['port_no']) == int(mon_port):\r
port_uptime = port_stat['duration_sec'] + port_stat['duration_nsec'] * 10 ** (-9)\r
this_measurement = int(port_stat[metric_key])\r
- #logging.info('set prom packets:{0} {1}:{2}'.format(this_measurement, vnf_name, vnf_interface))\r
\r
# set prometheus metric\r
self.prom_metrics[metric_dict['metric_key']].\\r
labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': None}).\\r
set(this_measurement)\r
- #push_to_gateway(self.pushgateway, job='SDNcontroller',\r
- # grouping_key={'metric':metric_dict['metric_key']}, registry=self.registry)\r
\r
# 1 single monitor job for all metrics of the SDN controller\r
pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)\r
\r
+ # also the rate is calculated here, but not used for now\r
+ # (rate can be easily queried from prometheus also)\r
if previous_monitor_time <= 0 or previous_monitor_time >= port_uptime:\r
metric_dict['previous_measurement'] = int(port_stat[metric_key])\r
metric_dict['previous_monitor_time'] = port_uptime\r
# do first measurement\r
- #logging.info('first measurement')\r
time.sleep(1)\r
self.monitor_lock.release()\r
\r
else:\r
time_delta = (port_uptime - metric_dict['previous_monitor_time'])\r
metric_rate = (this_measurement - metric_dict['previous_measurement']) / float(time_delta)\r
- #logging.info('metric: {0} rate:{1}'.format(metric_dict['metric_key'], metric_rate))\r
\r
metric_dict['previous_measurement'] = this_measurement\r
metric_dict['previous_monitor_time'] = port_uptime\r
\r
def set_flow_metric(self, metric_dict, flow_stat_dict):\r
# vnf tx is the datacenter switch rx and vice-versa\r
- #metric_key = self.switch_tx_rx(metric_dict['metric_key'])\r
metric_key = metric_dict['metric_key']\r
switch_dpid = metric_dict['switch_dpid']\r
vnf_name = metric_dict['vnf_name']\r
set(counter)\r
pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)\r
\r
- #logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))\r
- #return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)\r
-\r
- def query_Prometheus(self, query):\r
- '''\r
- escaped_chars='{}[]'\r
- for old in escaped_chars:\r
- new = '\{0}'.format(old)\r
- query = query.replace(old, new)\r
- '''\r
- url = self.prometheus_REST_api + '/' + 'api/v1/query?query=' + query\r
- #logging.info('query:{0}'.format(url))\r
- req = urllib2.Request(url)\r
- ret = urllib2.urlopen(req).read()\r
- ret = ast.literal_eval(ret)\r
- if ret['status'] == 'success':\r
- #logging.info('return:{0}'.format(ret))\r
- try:\r
- ret = ret['data']['result'][0]['value']\r
- except:\r
- ret = None\r
- else:\r
- ret = None\r
- return ret\r
\r
def start_Prometheus(self, port=9090):\r
# prometheus.yml configuration file is located in the same directory as this file\r
name]\r
Popen(cmd).wait()\r
\r
- def profile(self, mgmt_ip, rate, input_ip, vnf_uuid ):\r
-\r
- ssh = paramiko.SSHClient()\r
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())\r
- #ssh.connect(mgmt_ip, username='steven', password='test')\r
- ssh.connect(mgmt_ip, username='root', password='root')\r
-\r
- iperf_cmd = 'iperf -c {0} -u -l18 -b{1}M -t1000 &'.format(input_ip, rate)\r
- if rate > 0:\r
- stdin, stdout, stderr = ssh.exec_command(iperf_cmd)\r
-\r
- start_time = time.time()\r
- query_cpu = '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid, 1)\r
- while (time.time() - start_time) < 15:\r
- data = self.query_Prometheus(query_cpu)\r
- # logging.info('rate: {1} data:{0}'.format(data, rate))\r
- gevent.sleep(0)\r
- time.sleep(1)\r
-\r
- query_cpu2 = '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid, 8)\r
- cpu_load = float(self.query_Prometheus(query_cpu2)[1])\r
- output = 'rate: {1}Mbps; cpu_load: {0}%'.format(round(cpu_load * 100, 2), rate)\r
- output_line = output\r
- logging.info(output_line)\r
-\r
- stop_iperf = 'pkill -9 iperf'\r
- stdin, stdout, stderr = ssh.exec_command(stop_iperf)\r
-\r
- return output_line\r
-\r
if kwargs.get('bidirectional'):
ret = ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
- elif cmd == 'del-flows': # TODO: del-flow to be implemented
+ elif cmd == 'del-flows':
ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
if kwargs.get('bidirectional'):
ret = ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
- # TODO: this needs to be cleaned up
#check if port is specified (vnf:port)
if vnf_src_interface is None:
# take first interface by default