set chaining via VLANs
authorstevenvanrossem <steven.vanrossem@intec.ugent.be>
Tue, 10 May 2016 09:41:29 +0000 (11:41 +0200)
committerstevenvanrossem <steven.vanrossem@intec.ugent.be>
Tue, 10 May 2016 09:41:29 +0000 (11:41 +0200)
src/emuvim/api/zerorpc/compute.py
src/emuvim/api/zerorpc/network.py
src/emuvim/cli/compute.py
src/emuvim/cli/monitor.py
src/emuvim/dcemulator/monitoring.py
src/emuvim/dcemulator/net.py
src/emuvim/dcemulator/node.py
src/emuvim/dcemulator/prometheus.yml
src/emuvim/examples/monitoring_demo_topology.py

index 4815fa7..63786fa 100644 (file)
@@ -9,6 +9,8 @@ import zerorpc
 
 import paramiko
 import ipaddress
+import time
+import gevent
 
 logging.basicConfig(level=logging.INFO)
 
@@ -116,6 +118,7 @@ class MultiDatacenterApi(object):
             logging.exception("RPC error.")
             return ex.message
 
+    @zerorpc.stream
     def compute_profile(self, dc_label, compute_name, image, kwargs):
         # note: zerorpc does not support keyword arguments
 
@@ -126,47 +129,95 @@ class MultiDatacenterApi(object):
                                   kwargs.get('command'))
         # start traffic source (with fixed ip addres, no use for now...)
         psrc_status = self.compute_action_start( dc_label, 'psrc', 'profile_source', [{'id':'output'}], None)
+        # start traffic sink (with fixed ip addres)
+        psink_status = self.compute_action_start(dc_label, 'psink', 'profile_sink', [{'id': 'input'}], None)
         # link vnf to traffic source
         DCNetwork = self.dcs.get(dc_label).net
         DCNetwork.setChain('psrc', compute_name,
                            vnf_src_interface='output',
                            vnf_dst_interface=kwargs.get('input'),
-                           cmd='add-flow', weight=None)
+                           cmd='add-flow', weight=None, bidirectional=True)
+        DCNetwork.setChain('psrc', compute_name,
+                           vnf_src_interface='output',
+                           vnf_dst_interface=kwargs.get('input'),
+                           cmd='add-flow', weight=None,
+                           match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
+                           cookie=10)
+        DCNetwork.setChain( compute_name, 'psink',
+                           vnf_src_interface='output',
+                           vnf_dst_interface=kwargs.get('input'),
+                           cmd='add-flow', weight=None, bidirectional=True)
+        DCNetwork.setChain(compute_name, 'psink',
+                       vnf_src_interface='output',
+                       vnf_dst_interface=kwargs.get('input'),
+                       cmd='add-flow', weight=None,
+                       match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
+                       cookie=11)
 
         ## SSM/SP tasks:
         # start traffic generation
+        '''
         for nw in psrc_status.get('network'):
             if nw.get('intf_name') == 'output':
                 psrc_output_ip = unicode(nw['ip'])
                 break
         dummy_iperf_server_ip = ipaddress.IPv4Address(psrc_output_ip) + 1
-        iperf_cmd = 'iperf -c {0} -u -l18 -b10M -t1000 &'.format(dummy_iperf_server_ip)
-
-        psrc_mgmt_ip = psrc_status['docker_network']
-        psrc_user='root'
-        psrc_passw='root'
-
-        # use ssh login when starting command externally
-        ret = self.dcs.get(dc_label).containers.get('psrc').pexec(iperf_cmd)
-        logging.info(ret)
-        self.dcs.get(dc_label).containers.get('psrc').monitor()
-
-        #ssh does not work when exectuted via zerorpc command
-        #psrc_mgmt_ip = '172.17.0.3'
-        #ssh = paramiko.SSHClient()
-        #ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-        #ssh.connect(psrc_mgmt_ip, username='steven', password='test')
-        #ssh.connect(psrc_mgmt_ip, username='root', password='root')
-
-        #iperf_cmd = 'iperf -c {0} -u -l18 -b10M -t1000'.format(dummy_iperf_server_ip)
-        #stdin, stdout, stderr = ssh.exec_command(iperf_cmd)
-        # get monitor data and analyze
+        '''
+        for nw in psink_status.get('network'):
+            if nw.get('intf_name') == 'input':
+                psink_input_ip = nw['ip']
+                break
 
-        # create table
 
-        ## VIM/dummy gatekeeper's tasks:
-        # remove vnfs and chain
+        # get monitor data and analyze
+        vnf_uuid = vnf_status['id']
+        psrc_mgmt_ip = psrc_status['docker_network']
 
+        # query rate
+
+        #need to wait a bit before containers are fully up?
+        time.sleep(2)
+
+        def generate():
+            for rate in [0, 1, 2, 3]:
+                #logging.info('query:{0}'.format(query_cpu))
+
+                output_line = DCNetwork.monitor_agent.profile(psrc_mgmt_ip, rate, psink_input_ip, vnf_uuid)
+                gevent.sleep(0)
+                yield output_line
+
+            # query loss
+
+
+            # create table
+
+            ## VIM/dummy gatekeeper's tasks:
+            # remove vnfs and chain
+            DCNetwork.setChain('psrc', compute_name,
+                               vnf_src_interface='output',
+                               vnf_dst_interface=kwargs.get('input'),
+                               cmd='del-flows', weight=None, bidirectional=True)
+            DCNetwork.setChain('psrc', compute_name,
+                               vnf_src_interface='output',
+                               vnf_dst_interface=kwargs.get('input'),
+                               cmd='del-flows', weight=None,
+                               match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
+                               cookie=10)
+            DCNetwork.setChain(compute_name, 'psink',
+                               vnf_src_interface='output',
+                               vnf_dst_interface=kwargs.get('input'),
+                               cmd='del-flows', weight=None, bidirectional=True)
+            DCNetwork.setChain(compute_name, 'psink',
+                               vnf_src_interface='output',
+                               vnf_dst_interface=kwargs.get('input'),
+                               cmd='del-flows', weight=None,
+                               match='dl_type=0x0800,nw_proto=17,udp_dst=5001',
+                               cookie=11)
+            self.compute_action_stop(dc_label, compute_name)
+            self.compute_action_stop(dc_label, 'psink')
+            self.compute_action_stop(dc_label, 'psrc')
+
+        return generate()
 
     def datacenter_list(self):
         logging.debug("RPC CALL: datacenter list")
@@ -184,9 +235,4 @@ class MultiDatacenterApi(object):
             logging.exception("RPC error.")
             return ex.message
 
-'''
-if __name__ == "__main__":
-    test = MultiDatacenterApi({})
-    test.compute_profile('dc1','vnf1', 'image',network='',command='test',other='other')
-'''
 
index af11bc3..b5873ff 100644 (file)
@@ -74,6 +74,7 @@ class DCNetworkApi(object):
                 vnf_src_name, vnf_dst_name,
                 vnf_src_interface=kwargs.get('vnf_src_interface'),
                 vnf_dst_interface=kwargs.get('vnf_dst_interface'),
+                cmd='add-flow',
                 weight=kwargs.get('weight'),
                 match=kwargs.get('match'),
                 bidirectional=kwargs.get('bidirectional'),
@@ -122,5 +123,31 @@ class DCNetworkApi(object):
             logging.exception("RPC error.")
             return ex.message
 
+    # setup the flow metrics measurement
+    def setup_flow(self, vnf_name, vnf_interface, metric, cookie):
+        logging.debug("RPC CALL: setup flow")
+        try:
+            c = self.net.monitor_agent.setup_flow(vnf_name, vnf_interface, metric, cookie)
+            return c
+        except Exception as ex:
+            logging.exception("RPC error.")
+            return ex.message
+
+    # do prometheus query
+    def prometheus(self, dc_label, vnf_name, vnf_interface, query):
+        logging.debug("RPC CALL: query prometheus")
+        vnf_status = self.net.dcs.get(dc_label).containers.get(vnf_name).getStatus()
+        uuid = vnf_status['id']
+        query = query.replace('<uuid>', uuid)
+        #if needed, replace interface id with emu-intfs name
+        # query = query.replace('<intf>', vnf_interface)
+        logging.info('query: {0}'.format(query))
+        try:
+            c = self.net.monitor_agent.query_Prometheus(query)
+            return c
+        except Exception as ex:
+            logging.exception("RPC error.")
+            return ex.message
+
 
 
index 0cfb024..dcb499d 100755 (executable)
@@ -15,7 +15,7 @@ pp = pprint.PrettyPrinter(indent=4)
 class ZeroRpcClient(object):
 
     def __init__(self):
-        self.c = zerorpc.Client()
+        self.c = zerorpc.Client(heartbeat=None, timeout=120) #heartbeat=None, timeout=120
         self.c.connect("tcp://127.0.0.1:4242")  # TODO hard coded for now. we'll change this later
         self.cmds = {}
 
@@ -90,13 +90,16 @@ class ZeroRpcClient(object):
             input=args.get("input"),
             output=args.get("output"))
 
-        r = self.c.compute_profile(
+        for output in self.c.compute_profile(
             args.get("datacenter"),
             args.get("name"),
             args.get("image"),
             params
-        )
-        pp.pprint(r)
+            ):
+            print(output + '\n')
+
+        #pp.pprint(r)
+        #print(r)
 
     def _create_dict(self, **kwargs):
         return kwargs
index 3b667f7..123abe5 100755 (executable)
@@ -46,6 +46,27 @@ class ZeroRpcClient(object):
             args.get("metric"))\r
         pp.pprint(r)\r
 \r
+    def setup_flow(self, args):\r
+        vnf_name = self._parse_vnf_name(args.get("vnf_name"))\r
+        vnf_interface = self._parse_vnf_interface(args.get("vnf_name"))\r
+        r = self.c.setup_flow(\r
+            vnf_name,\r
+            vnf_interface,\r
+            args.get("metric"),\r
+            args.get("cookie"))\r
+        pp.pprint(r)\r
+\r
+    def prometheus(self, args):\r
+        vnf_name = self._parse_vnf_name(args.get("vnf_name"))\r
+        vnf_interface = self._parse_vnf_interface(args.get("vnf_name"))\r
+        r = self.c.prometheus(\r
+            args.get("datacenter"),\r
+            vnf_name,\r
+            vnf_interface,\r
+            args.get("query"))\r
+        pp.pprint(r)\r
+\r
+\r
     def _parse_vnf_name(self, vnf_name_str):\r
         vnf_name = vnf_name_str.split(':')[0]\r
         return vnf_name\r
@@ -58,7 +79,7 @@ class ZeroRpcClient(object):
 \r
         return vnf_interface\r
 \r
-parser = argparse.ArgumentParser(description='son-emu network')\r
+parser = argparse.ArgumentParser(description='son-emu monitor')\r
 parser.add_argument(\r
     "command",\r
     help="Action to be executed")\r
@@ -68,7 +89,15 @@ parser.add_argument(
 parser.add_argument(\r
     "--metric", "-m", dest="metric",\r
     help="tx_bytes, rx_bytes, tx_packets, rx_packets")\r
-\r
+parser.add_argument(\r
+    "--cookie", "-c", dest="cookie",\r
+    help="flow cookie to monitor")\r
+parser.add_argument(\r
+    "--query", "-q", dest="query",\r
+    help="prometheus query")\r
+parser.add_argument(\r
+    "--datacenter", "-d", dest="datacenter",\r
+    help="Data center where the vnf is deployed")\r
 \r
 def main(argv):\r
     #print "This is the son-emu monitor CLI."\r
index 6531beb..ab77c39 100755 (executable)
@@ -11,6 +11,9 @@ import threading
 from subprocess import Popen, PIPE\r
 import os\r
 \r
+import paramiko\r
+import gevent\r
+\r
 logging.basicConfig(level=logging.INFO)\r
 \r
 """\r
@@ -21,9 +24,14 @@ class DCNetworkMonitor():
     def __init__(self, net):\r
         self.net = net\r
         # link to Ryu REST_API\r
-        self.ip = '0.0.0.0'\r
-        self.port = '8080'\r
-        self.REST_api = 'http://{0}:{1}'.format(self.ip,self.port)\r
+        ryu_ip = '0.0.0.0'\r
+        ryu_port = '8080'\r
+        self.ryu_REST_api = 'http://{0}:{1}'.format(ryu_ip, ryu_port)\r
+        prometheus_ip = '0.0.0.0'\r
+        prometheus_port = '9090'\r
+        self.prometheus_REST_api = 'http://{0}:{1}'.format(prometheus_ip, prometheus_port)\r
+\r
+\r
 \r
         # helper variables to calculate the metrics\r
         self.pushgateway = 'localhost:9091'\r
@@ -32,13 +40,13 @@ class DCNetworkMonitor():
         # supported Prometheus metrics\r
         self.registry = CollectorRegistry()\r
         self.prom_tx_packet_count = Gauge('sonemu_tx_count_packets', 'Total number of packets sent',\r
-                                          ['vnf_name', 'vnf_interface'], registry=self.registry)\r
+                                          ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)\r
         self.prom_rx_packet_count = Gauge('sonemu_rx_count_packets', 'Total number of packets received',\r
-                                          ['vnf_name', 'vnf_interface'], registry=self.registry)\r
+                                          ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)\r
         self.prom_tx_byte_count = Gauge('sonemu_tx_count_bytes', 'Total number of bytes sent',\r
-                                        ['vnf_name', 'vnf_interface'], registry=self.registry)\r
+                                        ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)\r
         self.prom_rx_byte_count = Gauge('sonemu_rx_count_bytes', 'Total number of bytes received',\r
-                                        ['vnf_name', 'vnf_interface'], registry=self.registry)\r
+                                        ['vnf_name', 'vnf_interface', 'flow_id'], registry=self.registry)\r
 \r
         self.prom_metrics={'tx_packets':self.prom_tx_packet_count, 'rx_packets':self.prom_rx_packet_count,\r
                            'tx_bytes':self.prom_tx_byte_count,'rx_bytes':self.prom_rx_byte_count}\r
@@ -57,18 +65,84 @@ class DCNetworkMonitor():
         }\r
         '''\r
         self.monitor_lock = threading.Lock()\r
+        self.monitor_flow_lock = threading.Lock()\r
         self.network_metrics = []\r
+        self.flow_metrics = []\r
 \r
         # start monitoring thread\r
         self.start_monitoring = True\r
         self.monitor_thread = threading.Thread(target=self.get_network_metrics)\r
         self.monitor_thread.start()\r
 \r
+        self.monitor_flow_thread = threading.Thread(target=self.get_flow_metrics)\r
+        self.monitor_flow_thread.start()\r
+\r
         # helper tools\r
         self.pushgateway_process = self.start_PushGateway()\r
         self.prometheus_process = self.start_Prometheus()\r
         self.cadvisor_process = self.start_cadvisor()\r
 \r
+    # first set some parameters, before measurement can start\r
+    def setup_flow(self, vnf_name, vnf_interface=None, metric='tx_packets', cookie=0):\r
+\r
+        flow_metric = {}\r
+\r
+        # check if port is specified (vnf:port)\r
+        if vnf_interface is None:\r
+            # take first interface by default\r
+            connected_sw = self.net.DCNetwork_graph.neighbors(vnf_name)[0]\r
+            link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]\r
+            vnf_interface = link_dict[0]['src_port_id']\r
+\r
+        flow_metric['vnf_name'] = vnf_name\r
+        flow_metric['vnf_interface'] = vnf_interface\r
+\r
+        vnf_switch = None\r
+        for connected_sw in self.net.DCNetwork_graph.neighbors(vnf_name):\r
+            link_dict = self.net.DCNetwork_graph[vnf_name][connected_sw]\r
+            for link in link_dict:\r
+                # logging.info("{0},{1}".format(link_dict[link],vnf_interface))\r
+                if link_dict[link]['src_port_id'] == vnf_interface:\r
+                    # found the right link and connected switch\r
+                    # logging.info("{0},{1}".format(link_dict[link]['src_port_id'], vnf_source_interface))\r
+                    vnf_switch = connected_sw\r
+                    flow_metric['mon_port'] = link_dict[link]['dst_port_nr']\r
+                    break\r
+\r
+        if not vnf_switch:\r
+            logging.exception("vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface))\r
+            return "vnf switch of {0}:{1} not found!".format(vnf_name, vnf_interface)\r
+\r
+        try:\r
+            # default port direction to monitor\r
+            if metric is None:\r
+                metric = 'tx_packets'\r
+\r
+            next_node = self.net.getNodeByName(vnf_switch)\r
+\r
+            if not isinstance(next_node, OVSSwitch):\r
+                logging.info("vnf: {0} is not connected to switch".format(vnf_name))\r
+                return\r
+\r
+            flow_metric['previous_measurement'] = 0\r
+            flow_metric['previous_monitor_time'] = 0\r
+\r
+            flow_metric['switch_dpid'] = int(str(next_node.dpid), 16)\r
+            flow_metric['metric_key'] = metric\r
+            flow_metric['cookie'] = cookie\r
+\r
+            self.monitor_flow_lock.acquire()\r
+            self.flow_metrics.append(flow_metric)\r
+            self.monitor_flow_lock.release()\r
+\r
+            logging.info('Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie))\r
+            return 'Started monitoring flow:{3} {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric, cookie)\r
+\r
+        except Exception as ex:\r
+            logging.exception("setup_metric error.")\r
+            return ex.message\r
+\r
+\r
     # first set some parameters, before measurement can start\r
     def setup_metric(self, vnf_name, vnf_interface=None, metric='tx_packets'):\r
 \r
@@ -139,9 +213,10 @@ class DCNetworkMonitor():
             logging.exception("setup_metric error.")\r
             return ex.message\r
 \r
-    def stop_metric(self, vnf_name, vnf_interface, metric):\r
+    def stop_metric(self, vnf_name, vnf_interface=None, metric=None):\r
 \r
         for metric_dict in self.network_metrics:\r
+            #logging.info('start Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric_dict))\r
             if metric_dict['vnf_name'] == vnf_name and metric_dict['vnf_interface'] == vnf_interface \\r
                     and metric_dict['metric_key'] == metric:\r
 \r
@@ -154,7 +229,7 @@ class DCNetworkMonitor():
                 #self.registry.unregister(self.prom_metrics[metric_dict['metric_key']])\r
 \r
                 for collector in self.registry._collectors :\r
-                    logging.info('name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, collector._metrics))\r
+                    #logging.info('name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames, collector._metrics))\r
                     """\r
                     INFO:root:name:sonemu_rx_count_packets\r
                     labels:('vnf_name', 'vnf_interface')\r
@@ -165,11 +240,11 @@ class DCNetworkMonitor():
                     """\r
                     logging.info('{0}'.format(collector._metrics.values()))\r
                     #if self.prom_metrics[metric_dict['metric_key']]\r
-                    if (vnf_name, vnf_interface) in collector._metrics:\r
+                    if (vnf_name, vnf_interface, 'None') in collector._metrics:\r
                         logging.info('2 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,\r
                                                                               collector._metrics))\r
                         #collector._metrics = {}\r
-                        collector.remove(vnf_name, vnf_interface)\r
+                        collector.remove(vnf_name, vnf_interface, 'None')\r
 \r
                 # set values to NaN, prometheus api currently does not support removal of metrics\r
                 #self.prom_metrics[metric_dict['metric_key']].labels(vnf_name, vnf_interface).set(float('nan'))\r
@@ -186,8 +261,51 @@ class DCNetworkMonitor():
                 logging.info('Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric))\r
                 return 'Stopped monitoring: {2} on {0}:{1}'.format(vnf_name, vnf_interface, metric)\r
 \r
+            # delete everything from this vnf\r
+            elif metric_dict['vnf_name'] == vnf_name and vnf_interface is None and metric is None:\r
+                self.monitor_lock.acquire()\r
+                self.network_metrics.remove(metric_dict)\r
+                for collector in self.registry._collectors:\r
+                    collector_dict = collector._metrics.copy()\r
+                    for name, interface, id in collector_dict:\r
+                        if name == vnf_name:\r
+                            logging.info('3 name:{0} labels:{1} metrics:{2}'.format(collector._name, collector._labelnames,\r
+                                                                           collector._metrics))\r
+                            collector.remove(name, interface, 'None')\r
+\r
+                delete_from_gateway(self.pushgateway, job='sonemu-SDNcontroller')\r
+                self.monitor_lock.release()\r
+                logging.info('Stopped monitoring vnf: {0}'.format(vnf_name))\r
+                return 'Stopped monitoring: {0}'.format(vnf_name)\r
+\r
 \r
     # get all metrics defined in the list and export it to Prometheus\r
+    def get_flow_metrics(self):\r
+        while self.start_monitoring:\r
+\r
+            self.monitor_flow_lock.acquire()\r
+\r
+            for flow_dict in self.flow_metrics:\r
+                data = {}\r
+\r
+                data['cookie'] = flow_dict['cookie']\r
+\r
+                if 'tx' in flow_dict['metric_key']:\r
+                    data['match'] = {'in_port':flow_dict['mon_port']}\r
+                elif 'rx' in flow_dict['metric_key']:\r
+                    data['out_port'] = flow_dict['mon_port']\r
+\r
+\r
+                # query Ryu\r
+                ret = self.REST_cmd('stats/flow', flow_dict['switch_dpid'], data=data)\r
+                flow_stat_dict = ast.literal_eval(ret)\r
+\r
+                logging.info('received flow stat:{0} '.format(flow_stat_dict))\r
+                self.set_flow_metric(flow_dict, flow_stat_dict)\r
+\r
+            self.monitor_flow_lock.release()\r
+            time.sleep(1)\r
+\r
     def get_network_metrics(self):\r
         while self.start_monitoring:\r
 \r
@@ -231,7 +349,7 @@ class DCNetworkMonitor():
 \r
                 # set prometheus metric\r
                 self.prom_metrics[metric_dict['metric_key']].\\r
-                    labels({'vnf_name':vnf_name, 'vnf_interface':vnf_interface}).\\r
+                    labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': None}).\\r
                     set(this_measurement)\r
                 #push_to_gateway(self.pushgateway, job='SDNcontroller',\r
                 #                grouping_key={'metric':metric_dict['metric_key']}, registry=self.registry)\r
@@ -253,7 +371,7 @@ class DCNetworkMonitor():
                 else:\r
                     time_delta = (port_uptime - metric_dict['previous_monitor_time'])\r
                     metric_rate = (this_measurement - metric_dict['previous_measurement']) / float(time_delta)\r
-                    logging.info('metric: {0} rate:{1}'.format(metric_dict['metric_key'], metric_rate))\r
+                    #logging.info('metric: {0} rate:{1}'.format(metric_dict['metric_key'], metric_rate))\r
 \r
                 metric_dict['previous_measurement'] = this_measurement\r
                 metric_dict['previous_monitor_time'] = port_uptime\r
@@ -262,11 +380,65 @@ class DCNetworkMonitor():
         logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))\r
         return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)\r
 \r
+    def set_flow_metric(self, metric_dict, flow_stat_dict):\r
+        # vnf tx is the datacenter switch rx and vice-versa\r
+        #metric_key = self.switch_tx_rx(metric_dict['metric_key'])\r
+        metric_key = metric_dict['metric_key']\r
+        switch_dpid = metric_dict['switch_dpid']\r
+        vnf_name = metric_dict['vnf_name']\r
+        vnf_interface = metric_dict['vnf_interface']\r
+        previous_measurement = metric_dict['previous_measurement']\r
+        previous_monitor_time = metric_dict['previous_monitor_time']\r
+        cookie = metric_dict['cookie']\r
+\r
+        # TODO aggregate all found flow stats\r
+        flow_stat = flow_stat_dict[str(switch_dpid)][0]\r
+        if 'bytes' in  metric_key:\r
+            counter = flow_stat['byte_count']\r
+        elif 'packet' in metric_key:\r
+            counter = flow_stat['packet_count']\r
+\r
+        flow_uptime = flow_stat['duration_sec'] + flow_stat['duration_nsec'] * 10 ** (-9)\r
 \r
-    def REST_cmd(self, prefix, dpid):\r
-        url = self.REST_api + '/' + str(prefix) + '/' + str(dpid)\r
+        self.prom_metrics[metric_dict['metric_key']]. \\r
+            labels({'vnf_name': vnf_name, 'vnf_interface': vnf_interface, 'flow_id': cookie}). \\r
+            set(counter)\r
+        pushadd_to_gateway(self.pushgateway, job='sonemu-SDNcontroller', registry=self.registry)\r
+\r
+        #logging.exception('metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface))\r
+        #return 'metric {0} not found on {1}:{2}'.format(metric_key, vnf_name, vnf_interface)\r
+\r
+    def REST_cmd(self, prefix, dpid, data=None):\r
+        url = self.ryu_REST_api + '/' + str(prefix) + '/' + str(dpid)\r
+        if data:\r
+            logging.info('POST: {0}'.format(str(data)))\r
+            req = urllib2.Request(url, str(data))\r
+        else:\r
+            req = urllib2.Request(url)\r
+\r
+        ret = urllib2.urlopen(req).read()\r
+        return ret\r
+\r
+    def query_Prometheus(self, query):\r
+        '''\r
+        escaped_chars='{}[]'\r
+        for old in escaped_chars:\r
+            new = '\{0}'.format(old)\r
+            query = query.replace(old, new)\r
+        '''\r
+        url = self.prometheus_REST_api + '/' + 'api/v1/query?query=' + query\r
+        #logging.info('query:{0}'.format(url))\r
         req = urllib2.Request(url)\r
         ret = urllib2.urlopen(req).read()\r
+        ret = ast.literal_eval(ret)\r
+        if ret['status'] == 'success':\r
+            #logging.info('return:{0}'.format(ret))\r
+            try:\r
+                ret = ret['data']['result'][0]['value']\r
+            except:\r
+                ret = None\r
+        else:\r
+            ret = None\r
         return ret\r
 \r
     def start_Prometheus(self, port=9090):\r
@@ -314,6 +486,7 @@ class DCNetworkMonitor():
         # stop the monitoring thread\r
         self.start_monitoring = False\r
         self.monitor_thread.join()\r
+        self.monitor_flow_thread.join()\r
 \r
         if self.prometheus_process is not None:\r
             logging.info('stopping prometheus container')\r
@@ -354,4 +527,33 @@ class DCNetworkMonitor():
                name]\r
         Popen(cmd).wait()\r
 \r
+    def profile(self, mgmt_ip, rate, input_ip, vnf_uuid ):\r
+\r
+        ssh = paramiko.SSHClient()\r
+        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())\r
+        #ssh.connect(mgmt_ip, username='steven', password='test')\r
+        ssh.connect(mgmt_ip, username='root', password='root')\r
+\r
+        iperf_cmd = 'iperf -c {0} -u -l18 -b{1}M -t1000 &'.format(input_ip, rate)\r
+        if rate > 0:\r
+            stdin, stdout, stderr = ssh.exec_command(iperf_cmd)\r
+\r
+        start_time = time.time()\r
+        query_cpu = '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid, 1)\r
+        while (time.time() - start_time) < 15:\r
+            data = self.query_Prometheus(query_cpu)\r
+            # logging.info('rate: {1} data:{0}'.format(data, rate))\r
+            gevent.sleep(0)\r
+            time.sleep(1)\r
+\r
+        query_cpu2 = '(sum(rate(container_cpu_usage_seconds_total{{id="/docker/{0}"}}[{1}s])))'.format(vnf_uuid, 8)\r
+        cpu_load = float(self.query_Prometheus(query_cpu2)[1])\r
+        output = 'rate: {1}Mbps; cpu_load: {0}%'.format(round(cpu_load * 100, 2), rate)\r
+        output_line = output\r
+        logging.info(output_line)\r
+\r
+        stop_iperf = 'pkill -9 iperf'\r
+        stdin, stdout, stderr = ssh.exec_command(stop_iperf)\r
+\r
+        return output_line\r
 \r
index 36fc921..f7fafdf 100755 (executable)
@@ -57,6 +57,9 @@ class DCNetwork(Dockernet):
         # graph of the complete DC network
         self.DCNetwork_graph = nx.MultiDiGraph()
 
+        # initialize pool of vlan tags to setup the SDN paths
+        self.vlans = range(4096)[::-1]
+
         # monitoring agent
         if monitor:
             self.monitor_agent = DCNetworkMonitor(self)
@@ -221,25 +224,37 @@ class DCNetwork(Dockernet):
         CLI(self)
 
     # to remove chain do setChain( src, dst, cmd='del-flows')
-    def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, cmd='add-flow',
-                 weight=None, **kwargs):
+    def setChain(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
+        cmd = kwargs.get('cmd')
+        if cmd == 'add-flow':
+            ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
+            if kwargs.get('bidirectional'):
+                return ret +'\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
+
+        elif cmd == 'del-flows':  # TODO: del-flow to be implemented
+            ret = self._chainAddFlow(vnf_src_name, vnf_dst_name, vnf_src_interface, vnf_dst_interface, **kwargs)
+            if kwargs.get('bidirectional'):
+                return ret + '\n' + self._chainAddFlow(vnf_dst_name, vnf_src_name, vnf_dst_interface, vnf_src_interface, **kwargs)
+
+        else:
+            return "Command unknown"
+
+
+    def _chainAddFlow(self, vnf_src_name, vnf_dst_name, vnf_src_interface=None, vnf_dst_interface=None, **kwargs):
 
-        logging.info('vnf_src_if: {0}'.format(vnf_src_interface))
+        # TODO: this needs to be cleaned up
         #check if port is specified (vnf:port)
         if vnf_src_interface is None:
             # take first interface by default
             connected_sw = self.DCNetwork_graph.neighbors(vnf_src_name)[0]
             link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
             vnf_src_interface = link_dict[0]['src_port_id']
-            #logging.info('vnf_src_if: {0}'.format(vnf_src_interface))
 
         for connected_sw in self.DCNetwork_graph.neighbors(vnf_src_name):
             link_dict = self.DCNetwork_graph[vnf_src_name][connected_sw]
             for link in link_dict:
-                #logging.info("here1: {0},{1}".format(link_dict[link],vnf_src_interface))
                 if link_dict[link]['src_port_id'] == vnf_src_interface:
                     # found the right link and connected switch
-                    #logging.info("conn_sw: {2},{0},{1}".format(link_dict[link]['src_port_id'], vnf_src_interface, connected_sw))
                     src_sw = connected_sw
 
                     src_sw_inport_nr = link_dict[link]['dst_port_nr']
@@ -263,23 +278,28 @@ class DCNetwork(Dockernet):
 
 
         # get shortest path
-        #path = nx.shortest_path(self.DCNetwork_graph, vnf_src_name, vnf_dst_name)
         try:
             # returns the first found shortest path
             # if all shortest paths are wanted, use: all_shortest_paths
-            path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=weight)
+            path = nx.shortest_path(self.DCNetwork_graph, src_sw, dst_sw, weight=kwargs.get('weight'))
         except:
             logging.info("No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name))
             return "No path could be found between {0} and {1}".format(vnf_src_name, vnf_dst_name)
 
         logging.info("Path between {0} and {1}: {2}".format(vnf_src_name, vnf_dst_name, path))
 
-        #current_hop = vnf_src_name
         current_hop = src_sw
         switch_inport_nr = src_sw_inport_nr
 
+        # choose free vlan if path contains more than 1 switch
+        if len(path) > 1:
+            vlan = self.vlans.pop()
+        else:
+            vlan = None
+
         for i in range(0,len(path)):
             current_node = self.getNodeByName(current_hop)
+
             if path.index(current_hop) < len(path)-1:
                 next_hop = path[path.index(current_hop)+1]
             else:
@@ -300,52 +320,31 @@ class DCNetwork(Dockernet):
                 switch_outport_nr = self.DCNetwork_graph[current_hop][next_hop][index_edge_out]['src_port_nr']
 
 
-            #logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport_nr, switch_outport_nr))
-            # set of entry via ovs-ofctl
-            # TODO use rest API of ryu to set flow entries to correct dpid
-            # TODO this only sets port in to out, no match, so this will give trouble when multiple services are deployed...
-            # TODO need multiple matches to do this (VLAN tags)
+           # set of entry via ovs-ofctl
             if isinstance( current_node, OVSSwitch ):
-                cookie = kwargs.get('cookie')
-                match_input = kwargs.get('match')
-                self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, match_input, cmd, cookie)
-                if kwargs.get('bidirectional'):
-                    self._set_flow_entry_dpctl(current_node, switch_outport_nr, switch_inport_nr, match_input, cmd, cookie)
-                '''
-                match = 'in_port=%s' % switch_inport_nr
-                #add additional match entries from the argument
-                match_input = kwargs.get('match')
-                #logging.info('match input:{0}'.format(match_input))
-                if match_input:
-                    s = ','
-                    match = s.join([match,match_input])
-
-                if cmd=='add-flow':
-                    action = 'action=%s' % switch_outport_nr
-                    s = ','
-                    ofcmd = s.join([match,action])
-                elif cmd=='del-flows':
-                    ofcmd = match
-                else:
-                    ofcmd=''
-
-                current_node.dpctl(cmd, ofcmd)
-                logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(current_node.name, switch_inport_nr,
-                                                                                     switch_outport_nr))
-                '''
+                kwargs['vlan'] = vlan
+                kwargs['path'] = path
+                kwargs['current_hop'] = current_hop
+                self._set_flow_entry_dpctl(current_node, switch_inport_nr, switch_outport_nr, **kwargs)
+                # TODO set entry via Ryu REST api (in case emulator is running remote...)
+
             # take first link between switches by default
             if isinstance( next_node, OVSSwitch ):
                 switch_inport_nr = self.DCNetwork_graph[current_hop][next_hop][0]['dst_port_nr']
                 current_hop = next_hop
 
         return "path added between {0} and {1}".format(vnf_src_name, vnf_dst_name)
-        #return "destination node: {0} not reached".format(vnf_dst_name)
 
-    def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, match_input, cmd, cookie):
+    def _set_flow_entry_dpctl(self, node, switch_inport_nr, switch_outport_nr, **kwargs):
         match = 'in_port=%s' % switch_inport_nr
-        # add additional match entries from the argument
-        #match_input = kwargs.get('match')
-        # logging.info('match input:{0}'.format(match_input))
+
+        cookie = kwargs.get('cookie')
+        match_input = kwargs.get('match')
+        cmd = kwargs.get('cmd')
+        path = kwargs.get('path')
+        current_hop = kwargs.get('current_hop')
+        vlan = kwargs.get('vlan')
+
         s = ','
         if cookie:
             cookie = 'cookie=%s' % cookie
@@ -354,6 +353,15 @@ class DCNetwork(Dockernet):
             match = s.join([match, match_input])
         if cmd == 'add-flow':
             action = 'action=%s' % switch_outport_nr
+            if vlan != None:
+                if path.index(current_hop) == 0:  # first node
+                    action = ('action=mod_vlan_vid:%s' % vlan) + (',output=%s' % switch_outport_nr)
+                    match = '-O OpenFlow13 ' + match
+                elif path.index(current_hop) == len(path) - 1:  # last node
+                    match += ',dl_vlan=%s' % vlan
+                    action = 'action=strip_vlan,output=%s' % switch_outport_nr
+                else:  # middle nodes
+                    match += ',dl_vlan=%s' % vlan
             ofcmd = s.join([match, action])
         elif cmd == 'del-flows':
             ofcmd = match
@@ -361,8 +369,8 @@ class DCNetwork(Dockernet):
             ofcmd = ''
 
         node.dpctl(cmd, ofcmd)
-        logging.info("add flow in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
-                                                                                 switch_outport_nr))
+        logging.info("{3} in switch: {0} in_port: {1} out_port: {2}".format(node.name, switch_inport_nr,
+                                                                                 switch_outport_nr, cmd))
 
     # start Ryu Openflow controller as Remote Controller for the DCNetwork
     def startRyu(self):
index ad33adf..3258a9f 100755 (executable)
@@ -183,6 +183,10 @@ class Datacenter(object):
             raise Exception("Container with name %s not found." % name)
         LOG.debug("Stopping compute instance %r in data center %r" % (name, str(self)))
 
+        #  stop the monitored metrics
+        if self.net.monitor_agent is not None:
+            self.net.monitor_agent.stop_metric(name)
+
         # call resource model and free resources
         if self._resource_model is not None:
             self._resource_model.free(self.containers[name])
index 2412f85..6b3867b 100755 (executable)
@@ -34,7 +34,7 @@ scrape_configs:
   - job_name: 'cAdvisor'
 
     # Override the global default and scrape targets from this job every 5 seconds.
-    scrape_interval: 5s
+    scrape_interval: 1s
 
     target_groups:
       - targets: ['172.17.0.1:8090']
@@ -42,7 +42,7 @@ scrape_configs:
   - job_name: 'PushGateway'
 
     # Override the global default and scrape targets from this job every 5 seconds.
-    scrape_interval: 5s
+    scrape_interval: 1s
 
     target_groups:
       - targets: ['172.17.0.1:9091']
index 8592d62..9737609 100755 (executable)
@@ -43,7 +43,7 @@ def create_topology1():
         first prototype)
     """
     dc1 = net.addDatacenter("datacenter1")
-    #dc2 = net.addDatacenter("datacenter2")
+    dc2 = net.addDatacenter("datacenter2")
     #dc3 = net.addDatacenter("long_data_center_name3")
     #dc4 = net.addDatacenter(
     #    "datacenter4",
@@ -60,7 +60,7 @@ def create_topology1():
        to define you topology.
        These links can use Mininet's features to limit bw, add delay or jitter.
     """
-    #net.addLink(dc1, dc2, delay="10ms")
+    net.addLink(dc1, dc2, delay="10ms")
     #net.addLink(dc1, dc2)
     #net.addLink("datacenter1", s1, delay="20ms")
     #net.addLink(s1, dc3)
@@ -81,7 +81,7 @@ def create_topology1():
     zapi1 = ZeroRpcApiEndpoint("0.0.0.0", 4242)
     # connect data centers to this endpoint
     zapi1.connectDatacenter(dc1)
-    #zapi1.connectDatacenter(dc2)
+    zapi1.connectDatacenter(dc2)
     #zapi1.connectDatacenter(dc3)
     #zapi1.connectDatacenter(dc4)
     # run API endpoint server (in another thread, don't block)