import unittest
from emuvim.test.base import SimpleTestTopology
from emuvim.api.sonata import SonataDummyGatekeeperEndpoint
-from emuvim.api.sonata.dummygatekeeper import initialize_GK
+from emuvim.api.sonata.dummygatekeeper import initialize_GK, parse_interface
import mininet.clean
+from ipaddress import ip_network
PACKAGE_PATH = "misc/sonata-demo-service.son"
# @unittest.skip("disabled")
def test_GK_Api_start_service(self):
# create network
- self.createNet(nswitches=0, ndatacenter=2, nhosts=2, ndockers=0)
+ self.createNet(nswitches=0, ndatacenter=2, nhosts=2, ndockers=0, enable_learning=True)
# setup links
self.net.addLink(self.dc[0], self.h[0])
self.net.addLink(self.dc[0], self.dc[1])
sdkg1.connectDatacenter(self.dc[1])
# run the dummy gatekeeper (in another thread, don't block)
sdkg1.start()
+ time.sleep(3)
# start Mininet network
self.startNet()
time.sleep(1)
# check compute list result
self.assertEqual(len(self.dc[0].listCompute()), 2)
# check connectivity by using ping
+ ELAN_list=[]
+
+ # check E-Line connection, by checking the IP addresses
+ for link in self.net.deployed_elines:
+ vnf_src, intf_src, vnf_sap_docker_name = parse_interface(link['connection_points_reference'][0])
+ print vnf_src, intf_src
+ src = self.net.getNodeByName(vnf_src)
+ if not src:
+ continue
+ network_list = src.getNetworkStatus()
+ src_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == intf_src][0]
+ src_mask = [intf['netmask'] for intf in network_list if intf['intf_name'] == intf_src][0]
+
+ vnf_dst, intf_dst, vnf_sap_docker_name = parse_interface(link['connection_points_reference'][1])
+ dst = self.net.getNodeByName(vnf_dst)
+ if not dst:
+ continue
+ network_list = dst.getNetworkStatus()
+ dst_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == intf_dst][0]
+ dst_mask = [intf['netmask'] for intf in network_list if intf['intf_name'] == intf_dst][0]
+
+ print "src = {0}:{1} ip={2} ".format(vnf_src, intf_src, src_ip, src_mask)
+ print "dst = {0}:{1} ip={2} ".format(vnf_dst, intf_dst, dst_ip, dst_mask)
+
+ # check if the E-Line IP's are in the same subnet
+ ret = ip_network(u'{0}'.format(src_ip, src_mask), strict=False)\
+ .compare_networks(ip_network(u'{0}'.format(dst_ip, dst_mask),strict=False))
+ self.assertTrue(ret == 0)
+
+
+ for vnf in self.dc[0].listCompute():
+ # check E LAN connection
+ network_list = vnf.getNetworkStatus()
+ mgmt_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == 'mgmt']
+ self.assertTrue(len(mgmt_ip) > 0)
+ ip_address = mgmt_ip[0]
+ ELAN_list.append(ip_address)
+ print ip_address
+
+ # check ELAN connection by ping over the mgmt network (needs to be configured as ELAN in the test service)
for vnf in self.dc[0].listCompute():
- p = self.net.ping([self.h[0], vnf])
- self.assertTrue(p <= 50.0)
+ network_list = vnf.getNetworkStatus()
+ mgmt_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == 'mgmt']
+ self.assertTrue(len(mgmt_ip) > 0)
+ ip_address = mgmt_ip[0]
+ print ELAN_list
+ print ip_address
+ test_ip_list = list(ELAN_list)
+ test_ip_list.remove(ip_address)
+ for ip in test_ip_list:
+ # only take ip address, without netmask
+ p = self.net.ping([vnf],manualdestip=ip.split('/')[0])
+ print p
+ self.assertTrue(p <= 0.0)
+
# stop Mininet network
self.stopNet()
initialize_GK()
-# @unittest.skip("disabled")
+ #@unittest.skip("disabled")
def test_GK_Api_stop_service(self):
# create network
self.createNet(ndatacenter=2, nhosts=2)
sdkg1.connectDatacenter(self.dc[1])
# run the dummy gatekeeper (in another thread, don't block)
sdkg1.start()
+ time.sleep(3)
# start Mininet network
self.startNet()
time.sleep(1)
self.stopNet()
initialize_GK()
+
+ #@unittest.skip("disabled")
def test_GK_stress_service(self):
# create network
self.createNet(ndatacenter=2, nhosts=2)
sdkg1.connectDatacenter(self.dc[1])
# run the dummy gatekeeper (in another thread, don't block)
sdkg1.start()
+ time.sleep(3)
# start Mininet network
self.startNet()
time.sleep(1)