X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=src%2Femuvim%2Ftest%2Funittests%2Ftest_sonata_dummy_gatekeeper.py;h=277e75c07a42bf6ab3b4e07b0dd3f790b8d3847d;hb=99d1f61305b5031df25ace2f49cf22220bd71e6c;hp=bfa9541d3c7b1e66a7c8cba7151523fc47b23265;hpb=a6ce6f3c00d8d4ae7d378da7ddc6feccf9a38477;p=osm%2Fvim-emu.git diff --git a/src/emuvim/test/unittests/test_sonata_dummy_gatekeeper.py b/src/emuvim/test/unittests/test_sonata_dummy_gatekeeper.py index bfa9541..277e75c 100755 --- a/src/emuvim/test/unittests/test_sonata_dummy_gatekeeper.py +++ b/src/emuvim/test/unittests/test_sonata_dummy_gatekeeper.py @@ -33,8 +33,9 @@ import os import unittest from emuvim.test.base import SimpleTestTopology from emuvim.api.sonata import SonataDummyGatekeeperEndpoint -from emuvim.api.sonata.dummygatekeeper import initialize_GK +from emuvim.api.sonata.dummygatekeeper import initialize_GK, parse_interface import mininet.clean +from ipaddress import ip_network PACKAGE_PATH = "misc/sonata-demo-service.son" @@ -88,20 +89,43 @@ class testSonataDummyGatekeeper(SimpleTestTopology): self.assertEqual(len(self.dc[0].listCompute()), 2) # check connectivity by using ping ELAN_list=[] - for i in [0]: - for vnf in self.dc[i].listCompute(): - # check connection - p = self.net.ping([self.h[i], vnf]) - print p - self.assertTrue(p <= 0.0) - # check E LAN connection - network_list = vnf.getNetworkStatus() - mgmt_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == 'mgmt'] - self.assertTrue(len(mgmt_ip) > 0) - ip_address = mgmt_ip[0] - ELAN_list.append(ip_address) - print ip_address + # check E-Line connection, by checking the IP addresses + for link in self.net.deployed_elines: + vnf_src, intf_src, vnf_sap_docker_name = parse_interface(link['connection_points_reference'][0]) + print vnf_src, intf_src + src = self.net.getNodeByName(vnf_src) + if not src: + continue + network_list = src.getNetworkStatus() + src_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == intf_src][0] + src_mask = [intf['netmask'] for intf in network_list if intf['intf_name'] == intf_src][0] + + vnf_dst, intf_dst, vnf_sap_docker_name = parse_interface(link['connection_points_reference'][1]) + dst = self.net.getNodeByName(vnf_dst) + if not dst: + continue + network_list = dst.getNetworkStatus() + dst_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == intf_dst][0] + dst_mask = [intf['netmask'] for intf in network_list if intf['intf_name'] == intf_dst][0] + + print "src = {0}:{1} ip={2}/{3} ".format(vnf_src, intf_src, src_ip, src_mask) + print "dst = {0}:{1} ip={2}/{3} ".format(vnf_dst, intf_dst, dst_ip, dst_mask) + + # check if the E-Line IP's are in the same subnet + ret = ip_network(u'{0}/{1}'.format(src_ip, src_mask), strict=False)\ + .compare_networks(ip_network(u'{0}/{1}'.format(dst_ip, dst_mask),strict=False)) + self.assertTrue(ret == 0) + + + for vnf in self.dc[0].listCompute(): + # check E LAN connection + network_list = vnf.getNetworkStatus() + mgmt_ip = [intf['ip'] for intf in network_list if intf['intf_name'] == 'mgmt'] + self.assertTrue(len(mgmt_ip) > 0) + ip_address = mgmt_ip[0] + ELAN_list.append(ip_address) + print ip_address # check ELAN connection by ping over the mgmt network (needs to be configured as ELAN in the test service) for vnf in self.dc[0].listCompute(): @@ -122,7 +146,7 @@ class testSonataDummyGatekeeper(SimpleTestTopology): self.stopNet() initialize_GK() -# @unittest.skip("disabled") + #@unittest.skip("disabled") def test_GK_Api_stop_service(self): # create network self.createNet(ndatacenter=2, nhosts=2) @@ -180,6 +204,8 @@ class testSonataDummyGatekeeper(SimpleTestTopology): self.stopNet() initialize_GK() + + #@unittest.skip("disabled") def test_GK_stress_service(self): # create network self.createNet(ndatacenter=2, nhosts=2)