From 461941c38b3981b78fa9238bc5b4282b08c7bac3 Mon Sep 17 00:00:00 2001 From: stevenvanrossem Date: Tue, 10 May 2016 11:41:29 +0200 Subject: [PATCH] set chaining via VLANs --- src/emuvim/api/zerorpc/compute.py | 106 +++++--- src/emuvim/api/zerorpc/network.py | 27 ++ src/emuvim/cli/compute.py | 11 +- src/emuvim/cli/monitor.py | 33 ++- src/emuvim/dcemulator/monitoring.py | 232 ++++++++++++++++-- src/emuvim/dcemulator/net.py | 104 ++++---- src/emuvim/dcemulator/node.py | 4 + src/emuvim/dcemulator/prometheus.yml | 4 +- .../examples/monitoring_demo_topology.py | 6 +- 9 files changed, 423 insertions(+), 104 deletions(-) diff --git a/src/emuvim/api/zerorpc/compute.py b/src/emuvim/api/zerorpc/compute.py index 4815fa7..63786fa 100644 --- a/src/emuvim/api/zerorpc/compute.py +++ b/src/emuvim/api/zerorpc/compute.py @@ -9,6 +9,8 @@ import zerorpc import paramiko import ipaddress +import time +import gevent logging.basicConfig(level=logging.INFO) @@ -116,6 +118,7 @@ class MultiDatacenterApi(object): logging.exception("RPC error.") return ex.message + @zerorpc.stream def compute_profile(self, dc_label, compute_name, image, kwargs): # note: zerorpc does not support keyword arguments @@ -126,47 +129,95 @@ class MultiDatacenterApi(object): kwargs.get('command')) # start traffic source (with fixed ip addres, no use for now...) psrc_status = self.compute_action_start( dc_label, 'psrc', 'profile_source', [{'id':'output'}], None) + # start traffic sink (with fixed ip addres) + psink_status = self.compute_action_start(dc_label, 'psink', 'profile_sink', [{'id': 'input'}], None) # link vnf to traffic source DCNetwork = self.dcs.get(dc_label).net DCNetwork.setChain('psrc', compute_name, vnf_src_interface='output', vnf_dst_interface=kwargs.get('input'), - cmd='add-flow', weight=None) + cmd='add-flow', weight=None, bidirectional=True) + DCNetwork.setChain('psrc', compute_name, + vnf_src_interface='output', + vnf_dst_interface=kwargs.get('input'), + cmd='add-flow', weight=None, + match='dl_type=0x0800,nw_proto=17,udp_dst=5001', + cookie=10) + DCNetwork.setChain( compute_name, 'psink', + vnf_src_interface='output', + vnf_dst_interface=kwargs.get('input'), + cmd='add-flow', weight=None, bidirectional=True) + DCNetwork.setChain(compute_name, 'psink', + vnf_src_interface='output', + vnf_dst_interface=kwargs.get('input'), + cmd='add-flow', weight=None, + match='dl_type=0x0800,nw_proto=17,udp_dst=5001', + cookie=11) ## SSM/SP tasks: # start traffic generation + ''' for nw in psrc_status.get('network'): if nw.get('intf_name') == 'output': psrc_output_ip = unicode(nw['ip']) break dummy_iperf_server_ip = ipaddress.IPv4Address(psrc_output_ip) + 1 - iperf_cmd = 'iperf -c {0} -u -l18 -b10M -t1000 &'.format(dummy_iperf_server_ip) - - psrc_mgmt_ip = psrc_status['docker_network'] - psrc_user='root' - psrc_passw='root' - - # use ssh login when starting command externally - ret = self.dcs.get(dc_label).containers.get('psrc').pexec(iperf_cmd) - logging.info(ret) - self.dcs.get(dc_label).containers.get('psrc').monitor() - - #ssh does not work when exectuted via zerorpc command - #psrc_mgmt_ip = '172.17.0.3' - #ssh = paramiko.SSHClient() - #ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - #ssh.connect(psrc_mgmt_ip, username='steven', password='test') - #ssh.connect(psrc_mgmt_ip, username='root', password='root') - - #iperf_cmd = 'iperf -c {0} -u -l18 -b10M -t1000'.format(dummy_iperf_server_ip) - #stdin, stdout, stderr = ssh.exec_command(iperf_cmd) - # get monitor data and analyze + ''' + for nw in psink_status.get('network'): + if nw.get('intf_name') == 'input': + psink_input_ip = nw['ip'] + break - # create table - ## VIM/dummy gatekeeper's tasks: - # remove vnfs and chain + # get monitor data and analyze + vnf_uuid = vnf_status['id'] + psrc_mgmt_ip = psrc_status['docker_network'] + # query rate + + #need to wait a bit before containers are fully up? + time.sleep(2) + + def generate(): + for rate in [0, 1, 2, 3]: + #logging.info('query:{0}'.format(query_cpu)) + + output_line = DCNetwork.monitor_agent.profile(psrc_mgmt_ip, rate, psink_input_ip, vnf_uuid) + gevent.sleep(0) + yield output_line + + # query loss + + + # create table + + ## VIM/dummy gatekeeper's tasks: + # remove vnfs and chain + DCNetwork.setChain('psrc', compute_name, + vnf_src_interface='output', + vnf_dst_interface=kwargs.get('input'), + cmd='del-flows', weight=None, bidirectional=True) + DCNetwork.setChain('psrc', compute_name, + vnf_src_interface='output', + vnf_dst_interface=kwargs.get('input'), + cmd='del-flows', weight=None, + match='dl_type=0x0800,nw_proto=17,udp_dst=5001', + cookie=10) + DCNetwork.setChain(compute_name, 'psink', + vnf_src_interface='output', + vnf_dst_interface=kwargs.get('input'), + cmd='del-flows', weight=None, bidirectional=True) + DCNetwork.setChain(compute_name, 'psink', + vnf_src_interface='output', + vnf_dst_interface=kwargs.get('input'), + cmd='del-flows', weight=None, + match='dl_type=0x0800,nw_proto=17,udp_dst=5001', + cookie=11) + self.compute_action_stop(dc_label, compute_name) + self.compute_action_stop(dc_label, 'psink') + self.compute_action_stop(dc_label, 'psrc') + + return generate() def datacenter_list(self): logging.debug("RPC CALL: datacenter list") @@ -184,9 +235,4 @@ class MultiDatacenterApi(object): logging.exception("RPC error.") return ex.message -''' -if __name__ == "__main__": - test = MultiDatacenterApi({}) - test.compute_profile('dc1','vnf1', 'image',network='',command='test',other='other') -''' diff --git a/src/emuvim/api/zerorpc/network.py b/src/emuvim/api/zerorpc/network.py index af11bc3..b5873ff 100644 --- a/src/emuvim/api/zerorpc/network.py +++ b/src/emuvim/api/zerorpc/network.py @@ -74,6 +74,7 @@ class DCNetworkApi(object): vnf_src_name, vnf_dst_name, vnf_src_interface=kwargs.get('vnf_src_interface'), vnf_dst_interface=kwargs.get('vnf_dst_interface'), + cmd='add-flow', weight=kwargs.get('weight'), match=kwargs.get('match'), bidirectional=kwargs.get('bidirectional'), @@ -122,5 +123,31 @@ class DCNetworkApi(object): logging.exception("RPC error.") return ex.message + # setup the flow metrics measurement + def setup_flow(self, vnf_name, vnf_interface, metric, cookie): + logging.debug("RPC CALL: setup flow") + try: + c = self.net.monitor_agent.setup_flow(vnf_name, vnf_interface, metric, cookie) + return c + except Exception as ex: + logging.exception("RPC error.") + return ex.message + + # do prometheus query + def prometheus(self, dc_label, vnf_name, vnf_interface, query): + logging.debug("RPC CALL: query prometheus") + vnf_status = self.net.dcs.get(dc_label).containers.get(vnf_name).getStatus() + uuid = vnf_status['id'] + query = query.replace('', uuid) + #if needed, replace interface id with emu-intfs name + # query = query.replace('', vnf_interface) + logging.info('query: {0}'.format(query)) + try: + c = self.net.monitor_agent.query_Prometheus(query) + return c + except Exception as ex: + logging.exception("RPC error.") + return ex.message + diff --git a/src/emuvim/cli/compute.py b/src/emuvim/cli/compute.py index 0cfb024..dcb499d 100755 --- a/src/emuvim/cli/compute.py +++ b/src/emuvim/cli/compute.py @@ -15,7 +15,7 @@ pp = pprint.PrettyPrinter(indent=4) class ZeroRpcClient(object): def __init__(self): - self.c = zerorpc.Client() + self.c = zerorpc.Client(heartbeat=None, timeout=120) #heartbeat=None, timeout=120 self.c.connect("tcp://127.0.0.1:4242") # TODO hard coded for now. we'll change this later self.cmds = {} @@ -90,13 +90,16 @@ class ZeroRpcClient(object): input=args.get("input"), output=args.get("output")) - r = self.c.compute_profile( + for output in self.c.compute_profile( args.get("datacenter"), args.get("name"), args.get("image"), params - ) - pp.pprint(r) + ): + print(output + '\n') + + #pp.pprint(r) + #print(r) def _create_dict(self, **kwargs): return kwargs diff --git a/src/emuvim/cli/monitor.py b/src/emuvim/cli/monitor.py index 3b667f7..123abe5 100755 --- a/src/emuvim/cli/monitor.py +++ b/src/emuvim/cli/monitor.py @@ -46,6 +46,27 @@ class ZeroRpcClient(object): args.get("metric")) pp.pprint(r) + def setup_flow(self, args): + vnf_name = self._parse_vnf_name(args.get("vnf_name")) + vnf_interface = self._parse_vnf_interface(args.get("vnf_name")) + r = self.c.setup_flow( + vnf_name, + vnf_interface, + args.get("metric"), + args.get("cookie")) + pp.pprint(r) + + def prometheus(self, args): + vnf_name = self._parse_vnf_name(args.get("vnf_name")) + vnf_interface = self._parse_vnf_interface(args.get("vnf_name")) + r = self.c.prometheus( + args.get("datacenter"), + vnf_name, + vnf_interface, + args.get("query")) + pp.pprint(r) + + def _parse_vnf_name(self, vnf_name_str): vnf_name = vnf_name_str.split(':')[0] return vnf_name @@ -58,7 +79,7 @@ class ZeroRpcClient(object): return vnf_interface -parser = argparse.ArgumentParser(description='son-emu network') +parser = argparse.ArgumentParser(description='son-emu monitor') parser.add_argument( "command", help="Action to be executed") @@ -68,7 +89,15 @@ parser.add_argument( parser.add_argument( "--metric", "-m", dest="metric", help="tx_bytes, rx_bytes, tx_packets, rx_packets") - +parser.add_argument( + "--cookie", "-c", dest="cookie", + help="flow cookie to monitor") +parser.add_argument( + "--query", "-q", dest="query", + help="prometheus query") +parser.add_argument( + "--datacenter", "-d", dest="datacenter", + help="Data center where the vnf is deployed") def main(argv): #print "This is the son-emu monitor CLI." diff --git a/src/emuvim/dcemulator/monitoring.py b/src/emuvim/dcemulator/monitoring.py index 6531beb..ab77c39 100755 --- a/src/emuvim/dcemulator/monitoring.py +++ b/src/emuvim/dcemulator/monitoring.py @@ -11,6 +11,9 @@ import threading from subprocess import Popen, PIPE import os +import paramiko +import gevent + logging.basicConfig(level=logging.INFO) """ @@ -21,9 +24,14 @@ class DCNetworkMonitor(): def __init__(self, net): self.net = net # link to Ryu REST_API - self.ip = '0.0.0.0' - self.port = '8080' - self.REST_api = 'http://{0}:{1}'.format(self.ip,self.port) + ryu_ip = '0.0.0.0' + ryu_port = '8080' + self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port) + prometheus_ip = '0.0.0.0' + prometheus_port = '9090' + self.prometheus_REST_api = 'http://{0}:{1}'.format(prometheus_ip, prometheus_port) + + # helper variables to calculate the metrics self.pushgateway = 'localhost:9091' @@ -32,13 +40,13 @@ class DCNetworkMonitor(): # supported Prometheus metrics self.registry = CollectorRegistry() self.prom_tx_packet_count = Gauge('sonemu_tx_count_packets', 'Total number of packets sent', - ['vnf_name', 'vnf_interface'], registry=self.registry) + ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry) self.prom_rx_packet_count = Gauge('sonemu_rx_count_packets', 'Total number of packets received', - ['vnf_name', 'vnf_interface'], registry=self.registry) + ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry) self.prom_tx_byte_count = Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent', - ['vnf_name', 'vnf_interface'], registry=self.registry) + ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry) self.prom_rx_byte_count = Gauge('sonemu_rx_count_bytes', 'Total number of bytes received', - ['vnf_name', 'vnf_interface'], registry=self.registry) + ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry) self.prom_metrics={'tx_packets':self.prom_tx_packet_count, 'rx_packets':self.prom_rx_packet_count, 'tx_bytes':self.prom_tx_byte_count,'rx_bytes':self.prom_rx_byte_count} @@ -57,18 +65,84 @@ class DCNetworkMonitor(): } ''' self.monitor_lock = threading.Lock() + self.monitor_flow_lock = threading.Lock() self.network_metrics = [] + self.flow_metrics = [] # start monitoring thread self.start_monitoring = True self.monitor_thread = threading.Thread(target=self.get_network_metrics) self.monitor_thread.start() + self.monitor_flow_thread = threading.Thread(target=self.get_flow_metrics) + self.monitor_flow_thread.start() + # helper tools self.pushgateway_process = self.start_PushGateway() self.prometheus_process = self.start_Prometheus() self.cadvisor_process = self.start_cadvisor() + # first set some parameters, before measurement can start + def setup_flow(self, vnf_name, vnf_interface=None, metric='tx_packets', cookie=0): + + flow_metric = {} + + # check if port is specified (vnf:port) + if vnf_interface is None: + # take first interface by default + connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0] + link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw] + vnf_interface = link_dict[0]['src_port_id'] + + flow_metric['vnf_name'] = vnf_name + flow_metric['vnf_interface'] = vnf_interface + + vnf_switch = None + for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name): + link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw] + for link in link_dict: + # logging.info("{0},{1}".format(link_dict[link],vnf_interface)) + if link_dict[link]['src_port_id'] == vnf_interface: + # found the right link and connected switch + # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface)) + vnf_switch = connected_sw + flow_metric['mon_port'] = link_dict[link]['dst_port_nr'] + break + + if not vnf_switch: + logging.exception("vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface)) + return "vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface) + + try: + # default port direction to monitor + if metric is None: + metric = 'tx_packets' + + next_node = self.net.getNodeByName(vnf_switch) + + if not isinstance(next_node, OVSSwitch): + logging.info("vnf: {0} is not connected to switch".format(vnf_name)) + return + + flow_metric['previous_measurement'] = 0 + flow_metric['previous_monitor_time'] = 0 + + flow_metric['switch_dpid'] = int(str(next_node.dpid), 16) + flow_metric['metric_key'] = metric + flow_metric['cookie'] = cookie + + self.monitor_flow_lock.acquire() + self.flow_metrics.append(flow_metric) + self.monitor_flow_lock.release() + + logging.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie)) + return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie) + + except Exception as ex: + logging.exception("setup_metric error.") + return ex.message + + # first set some parameters, before measurement can start def setup_metric(self, vnf_name, vnf_interface=None, metric='tx_packets'): @@ -139,9 +213,10 @@ class DCNetworkMonitor(): logging.exception("setup_metric error.") return ex.message - def stop_metric(self, vnf_name, vnf_interface, metric): + def stop_metric(self, vnf_name, vnf_interface=None, metric=None): for metric_dict in self.network_metrics: + #logging.info('start Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric_dict)) if metric_dict['vnf_name'] == vnf_name and metric_dict['vnf_interface'] == vnf_interface \ and metric_dict['metric_key'] == metric: @@ -154,7 +229,7 @@ class DCNetworkMonitor(): #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']]) for collector in self.registry._collectors : - logging.info('name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, collector._metrics)) + #logging.info('name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, collector._metrics)) """ INFO:root:name:sonemu_rx_count_packets labels:('vnf_name', 'vnf_interface') @@ -165,11 +240,11 @@ class DCNetworkMonitor(): """ logging.info('{0}'.format(collector._metrics.values())) #if self.prom_metrics[metric_dict['metric_key']] - if (vnf_name, vnf_interface) in collector._metrics: + if (vnf_name, vnf_interface, 'None') in collector._metrics: logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, collector._metrics)) #collector._metrics = {} - collector.remove(vnf_name, vnf_interface) + collector.remove(vnf_name, vnf_interface, 'None') # set values to NaN, prometheus api currently does not support removal of metrics #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan')) @@ -186,8 +261,51 @@ class DCNetworkMonitor(): logging.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)) return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric) + # delete everything from this vnf + elif metric_dict['vnf_name'] == vnf_name and vnf_interface is None and metric is None: + self.monitor_lock.acquire() + self.network_metrics.remove(metric_dict) + for collector in self.registry._collectors: + collector_dict = collector._metrics.copy() + for name, interface, id in collector_dict: + if name == vnf_name: + logging.info('3 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, + collector._metrics)) + collector.remove(name, interface, 'None') + + delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller') + self.monitor_lock.release() + logging.info('Stopped monitoring vnf: {0}'.format(vnf_name)) + return 'Stopped monitoring: {0}'.format(vnf_name) + # get all metrics defined in the list and export it to Prometheus + def get_flow_metrics(self): + while self.start_monitoring: + + self.monitor_flow_lock.acquire() + + for flow_dict in self.flow_metrics: + data = {} + + data['cookie'] = flow_dict['cookie'] + + if 'tx' in flow_dict['metric_key']: + data['match'] = {'in_port':flow_dict['mon_port']} + elif 'rx' in flow_dict['metric_key']: + data['out_port'] = flow_dict['mon_port'] + + + # query Ryu + ret = self.REST_cmd('stats/flow', flow_dict['switch_dpid'], data=data) + flow_stat_dict = ast.literal_eval(ret) + + logging.info('received flow stat:{0} '.format(flow_stat_dict)) + self.set_flow_metric(flow_dict, flow_stat_dict) + + self.monitor_flow_lock.release() + time.sleep(1) + def get_network_metrics(self): while self.start_monitoring: @@ -231,7 +349,7 @@ class DCNetworkMonitor(): # set prometheus metric self.prom_metrics[metric_dict['metric_key']].\ - labels({'vnf_name':vnf_name, 'vnf_interface':vnf_interface}).\ + labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': None}).\ set(this_measurement) #push_to_gateway(self.pushgateway, job='SDNcontroller', # grouping_key={'metric':metric_dict['metric_key']}, registry=self.registry) @@ -253,7 +371,7 @@ class DCNetworkMonitor(): else: time_delta = (port_uptime - metric_dict['previous_monitor_time']) metric_rate = (this_measurement - metric_dict['previous_measurement']) / float(time_delta) - logging.info('metric: {0} rate:{1}'.format(metric_dict['metric_key'], metric_rate)) + #logging.info('metric: {0} rate:{1}'.format(metric_dict['metric_key'], metric_rate)) metric_dict['previous_measurement'] = this_measurement metric_dict['previous_monitor_time'] = port_uptime @@ -262,11 +380,65 @@ class DCNetworkMonitor(): logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)) return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface) + def set_flow_metric(self, metric_dict, flow_stat_dict): + # vnf tx is the datacenter switch rx and vice-versa + #metric_key = self.switch_tx_rx(metric_dict['metric_key']) + metric_key = metric_dict['metric_key'] + switch_dpid = metric_dict['switch_dpid'] + vnf_name = metric_dict['vnf_name'] + vnf_interface = metric_dict['vnf_interface'] + previous_measurement = metric_dict['previous_measurement'] + previous_monitor_time = metric_dict['previous_monitor_time'] + cookie = metric_dict['cookie'] + + # TODO aggregate all found flow stats + flow_stat = flow_stat_dict[str(switch_dpid)][0] + if 'bytes' in metric_key: + counter = flow_stat['byte_count'] + elif 'packet' in metric_key: + counter = flow_stat['packet_count'] + + flow_uptime = flow_stat['duration_sec'] + flow_stat['duration_nsec'] * 10 ** (-9) - def REST_cmd(self, prefix, dpid): - url = self.REST_api + '/' + str(prefix) + '/' + str(dpid) + self.prom_metrics[metric_dict['metric_key']]. \ + labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': cookie}). \ + set(counter) + pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry) + + #logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)) + #return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface) + + def REST_cmd(self, prefix, dpid, data=None): + url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid) + if data: + logging.info('POST: {0}'.format(str(data))) + req = urllib2.Request(url, str(data)) + else: + req = urllib2.Request(url) + + ret = urllib2.urlopen(req).read() + return ret + + def query_Prometheus(self, query): + ''' + escaped_chars='{}[]' + for old in escaped_chars: + new = '\{0}'.format(old) + query = query.replace(old, new) + ''' + url = self.prometheus_REST_api + '/' + 'api/v1/query?query=' + query + #logging.info('query:{0}'.format(url)) req = urllib2.Request(url) ret = urllib2.urlopen(req).read() + ret = ast.literal_eval(ret) + if ret['status'] == 'success': + #logging.info('return:{0}'.format(ret)) + try: + ret = ret['data']['result'][0]['value'] + except: + ret = None + else: + ret = None return ret def start_Prometheus(self, port=9090): @@ -314,6 +486,7 @@ class DCNetworkMonitor(): # stop the monitoring thread self.start_monitoring = False self.monitor_thread.join() + self.monitor_flow_thread.join() if self.prometheus_process is not None: logging.info('stopping prometheus container') @@ -354,4 +527,33 @@ class DCNetworkMonitor(): name] Popen(cmd).wait() + def profile(self, mgmt_ip, rate, input_ip, vnf_uuid ): + + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + #ssh.connect(mgmt_ip, username='steven', password='test') + ssh.connect(mgmt_ip, username='root', password='root') + + iperf_cmd = 'iperf -c {0} -u -l18 -b{1}M -t1000 &'.format(input_ip, rate) + if rate > 0: + stdin, stdout, stderr = ssh.exec_command(iperf_cmd) + + start_time = time.time() + query_cpu = '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid, 1) + while (time.time() - start_time) < 15: + data = self.query_Prometheus(query_cpu) + # logging.info('rate: {1} data:{0}'.format(data, rate)) + gevent.sleep(0) + time.sleep(1) + + query_cpu2 = '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid, 8) + cpu_load = float(self.query_Prometheus(query_cpu2)[1]) + output = 'rate: {1}Mbps; cpu_load: {0}%'.format(round(cpu_load * 100, 2), rate) + output_line = output + logging.info(output_line) + + stop_iperf = 'pkill -9 iperf' + stdin, stdout, stderr = ssh.exec_command(stop_iperf) + + return output_line diff --git a/src/emuvim/dcemulator/net.py b/src/emuvim/dcemulator/net.py index 36fc921..f7fafdf 100755 --- a/src/emuvim/dcemulator/net.py +++ b/src/emuvim/dcemulator/net.py @@ -57,6 +57,9 @@ class DCNetwork(Dockernet): # graph of the complete DC network self.DCNetwork_graph = nx.MultiDiGraph() + # initialize pool of vlan tags to setup the SDN paths + self.vlans = range(4096)[::-1] + # monitoring agent if monitor: self.monitor_agent = DCNetworkMonitor(self) @@ -221,25 +224,37 @@ class DCNetwork(Dockernet): CLI(self) # to remove chain do setChain( src, dst, cmd='del-flows') - def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, cmd='add-flow', - weight=None, **kwargs): + def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs): + cmd = kwargs.get('cmd') + if cmd == 'add-flow': + ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs) + if kwargs.get('bidirectional'): + return ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs) + + elif cmd == 'del-flows': # TODO: del-flow to be implemented + ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs) + if kwargs.get('bidirectional'): + return ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs) + + else: + return "Command unknown" + + + def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs): - logging.info('vnf_src_if: {0}'.format(vnf_src_interface)) + # TODO: this needs to be cleaned up #check if port is specified (vnf:port) if vnf_src_interface is None: # take first interface by default connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0] link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw] vnf_src_interface = link_dict[0]['src_port_id'] - #logging.info('vnf_src_if: {0}'.format(vnf_src_interface)) for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name): link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw] for link in link_dict: - #logging.info("here1: {0},{1}".format(link_dict[link],vnf_src_interface)) if link_dict[link]['src_port_id'] == vnf_src_interface: # found the right link and connected switch - #logging.info("conn_sw: {2},{0},{1}".format(link_dict[link]['src_port_id'], vnf_src_interface, connected_sw)) src_sw = connected_sw src_sw_inport_nr = link_dict[link]['dst_port_nr'] @@ -263,23 +278,28 @@ class DCNetwork(Dockernet): # get shortest path - #path = nx.shortest_path(self.DCNetwork_graph, vnf_src_name, vnf_dst_name) try: # returns the first found shortest path # if all shortest paths are wanted, use: all_shortest_paths - path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=weight) + path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight')) except: logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)) return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name) logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path)) - #current_hop = vnf_src_name current_hop = src_sw switch_inport_nr = src_sw_inport_nr + # choose free vlan if path contains more than 1 switch + if len(path) > 1: + vlan = self.vlans.pop() + else: + vlan = None + for i in range(0,len(path)): current_node = self.getNodeByName(current_hop) + if path.index(current_hop) < len(path)-1: next_hop = path[path.index(current_hop)+1] else: @@ -300,52 +320,31 @@ class DCNetwork(Dockernet): switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr'] - #logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport_nr, switch_outport_nr)) - # set of entry via ovs-ofctl - # TODO use rest API of ryu to set flow entries to correct dpid - # TODO this only sets port in to out, no match, so this will give trouble when multiple services are deployed... - # TODO need multiple matches to do this (VLAN tags) + # set of entry via ovs-ofctl if isinstance( current_node, OVSSwitch ): - cookie = kwargs.get('cookie') - match_input = kwargs.get('match') - self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, match_input, cmd, cookie) - if kwargs.get('bidirectional'): - self._set_flow_entry_dpctl(current_node, switch_outport_nr, switch_inport_nr, match_input, cmd, cookie) - ''' - match = 'in_port=%s' % switch_inport_nr - #add additional match entries from the argument - match_input = kwargs.get('match') - #logging.info('match input:{0}'.format(match_input)) - if match_input: - s = ',' - match = s.join([match,match_input]) - - if cmd=='add-flow': - action = 'action=%s' % switch_outport_nr - s = ',' - ofcmd = s.join([match,action]) - elif cmd=='del-flows': - ofcmd = match - else: - ofcmd='' - - current_node.dpctl(cmd, ofcmd) - logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport_nr, - switch_outport_nr)) - ''' + kwargs['vlan'] = vlan + kwargs['path'] = path + kwargs['current_hop'] = current_hop + self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs) + # TODO set entry via Ryu REST api (in case emulator is running remote...) + # take first link between switches by default if isinstance( next_node, OVSSwitch ): switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr'] current_hop = next_hop return "path added between {0} and {1}".format(vnf_src_name, vnf_dst_name) - #return "destination node: {0} not reached".format(vnf_dst_name) - def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, match_input, cmd, cookie): + def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs): match = 'in_port=%s' % switch_inport_nr - # add additional match entries from the argument - #match_input = kwargs.get('match') - # logging.info('match input:{0}'.format(match_input)) + + cookie = kwargs.get('cookie') + match_input = kwargs.get('match') + cmd = kwargs.get('cmd') + path = kwargs.get('path') + current_hop = kwargs.get('current_hop') + vlan = kwargs.get('vlan') + s = ',' if cookie: cookie = 'cookie=%s' % cookie @@ -354,6 +353,15 @@ class DCNetwork(Dockernet): match = s.join([match, match_input]) if cmd == 'add-flow': action = 'action=%s' % switch_outport_nr + if vlan != None: + if path.index(current_hop) == 0: # first node + action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr) + match = '-O OpenFlow13 ' + match + elif path.index(current_hop) == len(path) - 1: # last node + match += ',dl_vlan=%s' % vlan + action = 'action=strip_vlan,output=%s' % switch_outport_nr + else: # middle nodes + match += ',dl_vlan=%s' % vlan ofcmd = s.join([match, action]) elif cmd == 'del-flows': ofcmd = match @@ -361,8 +369,8 @@ class DCNetwork(Dockernet): ofcmd = '' node.dpctl(cmd, ofcmd) - logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr, - switch_outport_nr)) + logging.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr, + switch_outport_nr, cmd)) # start Ryu Openflow controller as Remote Controller for the DCNetwork def startRyu(self): diff --git a/src/emuvim/dcemulator/node.py b/src/emuvim/dcemulator/node.py index ad33adf..3258a9f 100755 --- a/src/emuvim/dcemulator/node.py +++ b/src/emuvim/dcemulator/node.py @@ -183,6 +183,10 @@ class Datacenter(object): raise Exception("Container with name %s not found." % name) LOG.debug("Stopping compute instance %r in data center %r" % (name, str(self))) + # stop the monitored metrics + if self.net.monitor_agent is not None: + self.net.monitor_agent.stop_metric(name) + # call resource model and free resources if self._resource_model is not None: self._resource_model.free(self.containers[name]) diff --git a/src/emuvim/dcemulator/prometheus.yml b/src/emuvim/dcemulator/prometheus.yml index 2412f85..6b3867b 100755 --- a/src/emuvim/dcemulator/prometheus.yml +++ b/src/emuvim/dcemulator/prometheus.yml @@ -34,7 +34,7 @@ scrape_configs: - job_name: 'cAdvisor' # Override the global default and scrape targets from this job every 5 seconds. - scrape_interval: 5s + scrape_interval: 1s target_groups: - targets: ['172.17.0.1:8090'] @@ -42,7 +42,7 @@ scrape_configs: - job_name: 'PushGateway' # Override the global default and scrape targets from this job every 5 seconds. - scrape_interval: 5s + scrape_interval: 1s target_groups: - targets: ['172.17.0.1:9091'] diff --git a/src/emuvim/examples/monitoring_demo_topology.py b/src/emuvim/examples/monitoring_demo_topology.py index 8592d62..9737609 100755 --- a/src/emuvim/examples/monitoring_demo_topology.py +++ b/src/emuvim/examples/monitoring_demo_topology.py @@ -43,7 +43,7 @@ def create_topology1(): first prototype) """ dc1 = net.addDatacenter("datacenter1") - #dc2 = net.addDatacenter("datacenter2") + dc2 = net.addDatacenter("datacenter2") #dc3 = net.addDatacenter("long_data_center_name3") #dc4 = net.addDatacenter( # "datacenter4", @@ -60,7 +60,7 @@ def create_topology1(): to define you topology. These links can use Mininet's features to limit bw, add delay or jitter. """ - #net.addLink(dc1, dc2, delay="10ms") + net.addLink(dc1, dc2, delay="10ms") #net.addLink(dc1, dc2) #net.addLink("datacenter1", s1, delay="20ms") #net.addLink(s1, dc3) @@ -81,7 +81,7 @@ def create_topology1(): zapi1 = ZeroRpcApiEndpoint("0.0.0.0", 4242) # connect data centers to this endpoint zapi1.connectDatacenter(dc1) - #zapi1.connectDatacenter(dc2) + zapi1.connectDatacenter(dc2) #zapi1.connectDatacenter(dc3) #zapi1.connectDatacenter(dc4) # run API endpoint server (in another thread, don't block) -- 2.25.1