X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2Fvim-emu.git;a=blobdiff_plain;f=src%2Femuvim%2Ftest%2Funittests%2Ftest_sonata_dummy_gatekeeper.py;h=ca8c57e3a013bd0bf50967dc430bf91a0ae66e2d;hp=bfa9541d3c7b1e66a7c8cba7151523fc47b23265;hb=a37394aa31408b69c01c67727d2576bb488b9c3d;hpb=a6ce6f3c00d8d4ae7d378da7ddc6feccf9a38477 diff --git a/src/emuvim/test/unittests/test_sonata_dummy_gatekeeper.py b/src/emuvim/test/unittests/test_sonata_dummy_gatekeeper.py index bfa9541..ca8c57e 100755 --- a/src/emuvim/test/unittests/test_sonata_dummy_gatekeeper.py +++ b/src/emuvim/test/unittests/test_sonata_dummy_gatekeeper.py @@ -33,8 +33,9 @@ import os import unittest from emuvim.test.base import SimpleTestTopology from emuvim.api.sonata import SonataDummyGatekeeperEndpoint -from emuvim.api.sonata.dummygatekeeper import initialize_GK +from emuvim.api.sonata.dummygatekeeper import initialize_GK, parse_interface import mininet.clean +from ipaddress import ip_network PACKAGE_PATH = "misc/sonata-demo-service.son" @@ -55,9 +56,10 @@ class testSonataDummyGatekeeper(SimpleTestTopology): sdkg1.connectDatacenter(self.dc[1]) # run the dummy gatekeeper (in another thread, don't block) sdkg1.start() + time.sleep(3) # start Mininet network self.startNet() - time.sleep(1) + time.sleep(3) print "starting tests" # board package @@ -88,20 +90,43 @@ class testSonataDummyGatekeeper(SimpleTestTopology): self.assertEqual(len(self.dc[0].listCompute()), 2) # check connectivity by using ping ELAN_list=[] - for i in [0]: - for vnf in self.dc[i].listCompute(): - # check connection - p = self.net.ping([self.h[i], vnf]) - print p - self.assertTrue(p <= 0.0) - # check E LAN connection - network_list = vnf.getNetworkStatus() - mgmt_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == 'mgmt'] - self.assertTrue(len(mgmt_ip) > 0) - ip_address = mgmt_ip[0] - ELAN_list.append(ip_address) - print ip_address + # check E-Line connection, by checking the IP addresses + for link in self.net.deployed_elines: + vnf_src, intf_src, vnf_sap_docker_name = parse_interface(link['connection_points_reference'][0]) + print vnf_src, intf_src + src = self.net.getNodeByName(vnf_src) + if not src: + continue + network_list = src.getNetworkStatus() + src_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == intf_src][0] + src_mask = [intf['netmask'] for intf in network_list if intf['intf_name'] == intf_src][0] + + vnf_dst, intf_dst, vnf_sap_docker_name = parse_interface(link['connection_points_reference'][1]) + dst = self.net.getNodeByName(vnf_dst) + if not dst: + continue + network_list = dst.getNetworkStatus() + dst_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == intf_dst][0] + dst_mask = [intf['netmask'] for intf in network_list if intf['intf_name'] == intf_dst][0] + + print "src = {0}:{1} ip={2} ".format(vnf_src, intf_src, src_ip, src_mask) + print "dst = {0}:{1} ip={2} ".format(vnf_dst, intf_dst, dst_ip, dst_mask) + + # check if the E-Line IP's are in the same subnet + ret = ip_network(u'{0}'.format(src_ip, src_mask), strict=False)\ + .compare_networks(ip_network(u'{0}'.format(dst_ip, dst_mask),strict=False)) + self.assertTrue(ret == 0) + + + for vnf in self.dc[0].listCompute(): + # check E LAN connection + network_list = vnf.getNetworkStatus() + mgmt_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == 'mgmt'] + self.assertTrue(len(mgmt_ip) > 0) + ip_address = mgmt_ip[0] + ELAN_list.append(ip_address) + print ip_address # check ELAN connection by ping over the mgmt network (needs to be configured as ELAN in the test service) for vnf in self.dc[0].listCompute(): @@ -114,7 +139,8 @@ class testSonataDummyGatekeeper(SimpleTestTopology): test_ip_list = list(ELAN_list) test_ip_list.remove(ip_address) for ip in test_ip_list: - p = self.net.ping([vnf],manualdestip=ip) + # only take ip address, without netmask + p = self.net.ping([vnf],manualdestip=ip.split('/')[0]) print p self.assertTrue(p <= 0.0) @@ -122,7 +148,7 @@ class testSonataDummyGatekeeper(SimpleTestTopology): self.stopNet() initialize_GK() -# @unittest.skip("disabled") + #@unittest.skip("disabled") def test_GK_Api_stop_service(self): # create network self.createNet(ndatacenter=2, nhosts=2) @@ -136,9 +162,10 @@ class testSonataDummyGatekeeper(SimpleTestTopology): sdkg1.connectDatacenter(self.dc[1]) # run the dummy gatekeeper (in another thread, don't block) sdkg1.start() + time.sleep(3) # start Mininet network self.startNet() - time.sleep(1) + time.sleep(3) print "starting tests" # board package @@ -180,6 +207,8 @@ class testSonataDummyGatekeeper(SimpleTestTopology): self.stopNet() initialize_GK() + + #@unittest.skip("disabled") def test_GK_stress_service(self): # create network self.createNet(ndatacenter=2, nhosts=2) @@ -189,9 +218,10 @@ class testSonataDummyGatekeeper(SimpleTestTopology): sdkg1.connectDatacenter(self.dc[1]) # run the dummy gatekeeper (in another thread, don't block) sdkg1.start() + time.sleep(3) # start Mininet network self.startNet() - time.sleep(1) + time.sleep(3) print "starting tests" # board package