+ con_vnfr_path = con_nsr_xpath + "/cm-vnfr[id={}]".format(quoted_key(vnfr.id))
+ con_data = proxy(RwConmanYang).get(con_vnfr_path)
+
+ assert con_data is not None
+ assert con_data.connection_point[1].ip_address == vnfd.vdu[0].interface[1].static_ip_address
+
+ xpath = "/rw-project:project[rw-project:name='default']/vlr-catalog/vlr[id={}]".format(quoted_key(vnfr.connection_point[1].vlr_ref))
+ vlr = proxy(RwVlrYang).get(xpath)
+
+ vim_client = vim_clients[cloud_account_name]
+ vm_property = vim_client.nova_server_get(vnfr.vdur[0].vim_id)
+ logger.info('VM properties for {}: {}'.format(vnfd.name, vm_property))
+
+ addr_prop_list = vm_property['addresses'][vlr.name]
+ logger.info('addresses attribute: {}'.format(addr_prop_list))
+
+ addr_prop = [addr_prop for addr_prop in addr_prop_list if addr_prop['addr'] == vnfr.connection_point[1].ip_address]
+ assert addr_prop
+
+ assert static_ip_vnfd # if False, then none of the VNF descriptors' connections points are carrying static-ip-address field.
+
+ # Check if the VMs are reachable from each other
+ username, password = ['fedora'] * 2
+ ssh_session = SshSession(ips['mgmt_ip'])
+ assert ssh_session
+ assert ssh_session.connect(username=username, password=password)
+ if not self.is_ipv6(ips['static_ip']):
+ assert ssh_session.run_command('ping -c 5 {}'.format(ips['static_ip']))[0] == 0
+
+ @pytest.mark.skipif(not pytest.config.getoption("--vnf-dependencies"), reason="need --vnf-dependencies option to run")
+ def test_vnf_dependencies(self, proxy):
+ """
+ Asserts:
+ 1. Match various config parameter sources with config primitive parameters
+ Three types of sources are being verified for pong vnfd.
+ Attribute: A runtime value like IP address of a connection point (../../../mgmt-interface, ip-address)
+ Descriptor: a XPath to a leaf in the VNF descriptor/config (../../../mgmt-interface/port)
+ Value: A pre-defined constant ('admin' as mentioned in pong descriptor)
+ 2. Match the config-parameter-map defined in NS descriptor
+ There used to be a check to verify config parameter values in cm-state (cm-state/cm-nsr/cm-vnfr/config-parameter).
+ Recently that got removed due to confd issue. So, there is no such check currently for cm-state.
+ """
+ nsr_cfg, _ = list(yield_nsrc_nsro_pairs(proxy))[0]
+ con_nsr_xpath = "/rw-project:project[rw-project:name='default']/cm-state/cm-nsr[id={}]".format(quoted_key(nsr_cfg.id))
+
+ pong_source_map, ping_request_map = None, None
+
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ # Get cm-state for this vnfr
+ con_vnfr_path = con_nsr_xpath + "/cm-vnfr[id={}]".format(quoted_key(vnfr.id))
+ con_data = proxy(RwConmanYang).get(con_vnfr_path)
+
+ # Match various config parameter sources with config primitive parameters
+ for config_primitive in vnfr.vnf_configuration.config_primitive:
+ if config_primitive.name in ("config", "start-stop"):
+ for parameter in config_primitive.parameter:
+ if parameter.name == 'mgmt_ip':
+ assert parameter.default_value == vnfr.mgmt_interface.ip_address
+ if parameter.name == 'mgmt_port':
+ assert parameter.default_value == str(vnfd.mgmt_interface.port)
+ if parameter.name == 'username':
+ assert parameter.default_value == 'admin'
+
+ # Fetch the source parameter values from pong vnf and request parameter values from ping vnf
+ if config_primitive.name == "config":
+ if vnfd.name == "pong_vnfd":
+ pong_source_map = [parameter.default_value for parameter in config_primitive.parameter if
+ parameter.name in ("service_ip", "service_port")]
+ if vnfd.name == "ping_vnfd":
+ ping_request_map = [parameter.default_value for parameter in config_primitive.parameter if
+ parameter.name in ("pong_ip", "pong_port")]
+ assert pong_source_map
+ assert ping_request_map
+ # Match the config-parameter-map defined in NS descriptor
+ assert sorted(pong_source_map) == sorted(ping_request_map)
+
+ @pytest.mark.skipif(not pytest.config.getoption("--port-security"), reason="need --port-security option to run")
+ def test_port_security(self, proxy, vim_clients, cloud_account_name):
+ """
+ Asserts:
+ 1. port-security-enabled match in vnfd and vnfr
+ 2. Get port property from openstack. Match these attributes: 'port_security_enabled', 'security_groups'
+ """
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ assert vnfd.connection_point[1].port_security_enabled == vnfr.connection_point[1].port_security_enabled
+
+ xpath = "/rw-project:project[rw-project:name='default']/vlr-catalog/vlr[id={}]".format(quoted_key(vnfr.connection_point[1].vlr_ref))
+ vlr = proxy(RwVlrYang).get(xpath)
+
+ vim_client = vim_clients[cloud_account_name]
+ port = [port for port in vim_client.neutron_port_list() if port['network_id'] == vlr.network_id if
+ port['name'] == vnfr.connection_point[1].name]
+ assert port
+
+ port_openstack = port[0]
+ assert vnfr.connection_point[1].port_security_enabled == port_openstack['port_security_enabled']
+
+ if vnfr.connection_point[1].port_security_enabled:
+ assert port_openstack['security_groups'] # It has to carry at least one security group if enabled
+ else:
+ assert not port_openstack['security_groups']
+
+ @pytest.mark.skipif(not pytest.config.getoption("--port-sequencing"), reason="need --port-sequencing option to run")
+ def test_explicit_port_sequencing(self, proxy, vim_clients, cloud_account_name, logger, port_sequencing_intf_positions, iteration):
+ """
+ Asserts:
+ 1. Interface count match in vnfd and vnfr
+ 2. Get interface ordering(mac address) from VM using 'ip a' command; From output of neutron port-list, get
+ corresponding connection point names in the same order as mac address ordered list.
+ 3. Get interface ordering from the vnfd/vdu
+ 4. Compare lists from step-2 and step-3
+ """
+ username, password = ['fedora']*2
+
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ assert len(vnfd.vdu[0].interface) == len(vnfr.vdur[0].interface)
+
+ logger.debug('Interface details for vnfd {}: {}'.format(vnfd.name, vnfd.vdu[0].as_dict()['interface']))
+
+ if iteration==1:
+ tmp_positional_values_list = []
+ for intf in vnfr.vdur[0].interface:
+ # if no position is specified for an interface, then vnfr/vdur/interface carries 0 as its positional value
+ if intf.position!=0:
+ tmp_positional_values_list.append(intf.position)
+ if 'ping' in vnfd.name:
+ assert not tmp_positional_values_list
+ if 'pong' in vnfd.name:
+ assert set(tmp_positional_values_list) == set(port_sequencing_intf_positions)
+
+ # Get a sorted list of interfaces from vnfd/vdu
+ icp_key_name, ecp_key_name = 'internal_connection_point_ref', 'external_connection_point_ref'
+ intf_with_position_field_dict, intf_without_position_field_list = {}, []
+
+ for intf in vnfd.vdu[0].interface:
+ intf = intf.as_dict()
+ cp_ref_key = icp_key_name if icp_key_name in intf else ecp_key_name
+ if 'position' in intf:
+ intf_with_position_field_dict[intf['position']] = intf[cp_ref_key]
+ else:
+ intf_without_position_field_list.append(intf[cp_ref_key])
+
+ intf_with_position_field_list = sorted(intf_with_position_field_dict.items(), key=operator.itemgetter(0))
+ sorted_cp_names_in_vnfd = [pos_cpname_tuple[1] for pos_cpname_tuple in intf_with_position_field_list] + \
+ sorted(intf_without_position_field_list)
+
+ # Establish a ssh session to VDU to get mac address list sorted by interfaces
+ ssh_session = SshSession(vnfr.vdur[0].management_ip)
+ assert ssh_session
+ assert ssh_session.connect(username=username, password=password)
+ e_code, ip_output, err = ssh_session.run_command('sudo ip a')
+ assert e_code == 0
+ logger.debug('Output of "ip a": {}'.format(ip_output))
+ mac_addr_list = re.findall(r'link/ether\s+(.*)\s+brd', ip_output)
+
+ # exclude eth0 as it is always a mgmt-interface
+ interface_starting_index = len(mac_addr_list) - len(vnfd.vdu[0].interface)
+ mac_addr_list = mac_addr_list[interface_starting_index: ]
+
+ # Get neutron port list
+ neutron_port_list = vim_clients[cloud_account_name].neutron_port_list()
+
+ # Get those ports whose mac_address value matches with one of the mac addresses in mac_addr_list
+ # This new list is already sorted as the outer loop iterates over mac_addr_list
+ sorted_cp_names_in_vm = [neutron_port_dict['name'] for mac in mac_addr_list for neutron_port_dict in neutron_port_list
+ if mac==neutron_port_dict['mac_address']]
+
+ logger.debug('Sorted connection points as per "ip a" in VM: {}'.format(sorted_cp_names_in_vm))
+ logger.debug('Sorted connection points as per ordering mentioned in vnfd: {}'.format(sorted_cp_names_in_vnfd))
+
+ assert sorted_cp_names_in_vm == sorted_cp_names_in_vnfd
+
+ @pytest.mark.skipif(
+ not (pytest.config.getoption("--vnf-dependencies") and
+ pytest.config.getoption("--service-primitive")),
+ reason="need --vnf-dependencies and --service-primitive option to run")
+ def test_primitives(
+ self, mgmt_session, cloud_module, cloud_account, descriptors,
+ fmt_nsd_catalog_xpath, logger):
+ """Testing service primitives and config primitives."""
+ # Create a cloud account
+ rift.auto.mano.create_cloud_account(
+ mgmt_session, cloud_account, "default")
+
+ rwnsr_pxy = mgmt_session.proxy(RwNsrYang)
+ nsr_pxy = mgmt_session.proxy(NsrYang)
+ rwvnfr_pxy = mgmt_session.proxy(RwVnfrYang)
+
+ # Testing a custom service primitive
+ ns_opdata = rwnsr_pxy.get(
+ '/rw-project:project[rw-project:name="default"]' +
+ '/ns-instance-opdata/nsr'
+ )
+ nsr_id = ns_opdata.ns_instance_config_ref
+ sp_rpc_input = NsrYang.YangInput_Nsr_ExecNsServicePrimitive.from_dict(
+ {'name': 'primitive_test', 'nsr_id_ref': nsr_id})
+ nsr_pxy.rpc(sp_rpc_input)
+
+ # Testing a config primitive
+ vnfr_catalog = rwvnfr_pxy.get(
+ '/rw-project:project[rw-project:name="default"]' +
+ '/vnfr-catalog'
+ )
+ cp_rpc_input = NsrYang.YangInput_Nsr_ExecNsServicePrimitive.from_dict(
+ {'nsr_id_ref': nsr_id})
+ vnf_list = cp_rpc_input.create_vnf_list()
+ vnf_primitive = vnf_list.create_vnf_primitive()
+ vnf_primitive.index = 1
+ vnf_primitive.name = "start-stop"
+ vnf_list.member_vnf_index_ref = (
+ vnfr_catalog.vnfr[0].member_vnf_index_ref
+ )
+ vnf_list._set_vnfr_id_ref(vnfr_catalog.vnfr[0].id)
+ vnf_list.vnf_primitive.append(vnf_primitive)
+ cp_rpc_input.vnf_list.append(vnf_list)
+ nsr_pxy.rpc(cp_rpc_input)
+ # Checking nsd joblist to see if both tests passed
+
+ def check_job_status(status=None):
+ ns_opdata = rwnsr_pxy.get(
+ '/rw-project:project[rw-project:name="default"]' +
+ '/ns-instance-opdata/nsr'
+ )
+ counter = 0
+ counter_limit = 2
+ for idx in range(0, counter_limit):
+ if ns_opdata.config_agent_job[idx].job_status == 'failure':
+ err_msg = (
+ 'Service primitive test failed.' +
+ ' The config agent reported failure job status')
+ raise JobStatusError(err_msg)
+
+ elif ns_opdata.config_agent_job[idx].job_status == 'success':
+ counter += 1
+ continue
+
+ if counter == counter_limit:
+ return True
+ else:
+ time.sleep(5)
+ return False
+
+ start_time = time.time()
+ while (time.time() - start_time < 60):
+ status = check_job_status()
+ if status:
+ break
+ else:
+ err_msg = (
+ 'Service primitive test failed. Timed out: 60 seconds' +
+ 'The config agent never reached a success status')
+ raise JobStatusError(err_msg)
+
+ @pytest.mark.skipif(
+ not (pytest.config.getoption("--metadata-vdud") or pytest.config.getoption("--metadata-vdud-cfgfile")),
+ reason="need --metadata-vdud or --metadata-vdud-cfgfile option to run")
+ def test_metadata_vdud(self, logger, proxy, vim_clients, cloud_account_name, metadata_host):
+ """
+ Asserts:
+ 1. content of supplemental-boot-data match in vnfd and vnfr
+ vnfr may carry extra custom-meta-data fields (e.g pci_assignement) which are by default enabled during VM creation by openstack.
+ vnfr doesn't carry config_file details; so that will be skipped during matching.
+ 2. boot-data-drive match with openstack VM's config_drive attribute
+ 3. For each VDUD which have config-file fields mentioned, check if there exists a path in the VM which
+ matches with config-file's dest field. (Only applicable for cirros_cfgfile_vnfd VNF RIFT-15524)
+ 4. For each VDUD, match its custom-meta-data fields with openstack VM's properties field
+ """
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ if any(name in vnfd.name for name in ['ping', 'pong', 'fedora']):
+ username, password = ['fedora'] * 2
+ elif 'ubuntu' in vnfd.name:
+ username, password = ['ubuntu'] * 2
+ elif 'cirros' in vnfd.name:
+ username, password = 'cirros', 'cubswin:)'
+ else:
+ assert False, 'Not expected to use this VNFD {} in this systemtest. VNFD might have changed. Exiting the test.'.format(
+ vnfd.name)
+
+ # Wait till VNF's operational-status becomes 'running'
+ # The below check is usually covered as part of test_wait_for_ns_configured
+ # But, this is mostly needed when non- ping pong packages are used e.g cirrus cfgfile package
+ xpath = "/rw-project:project[rw-project:name='default']/vnfr-catalog/vnfr[id={}]/operational-status".format(quoted_key(vnfr.id))
+ proxy(VnfrYang).wait_for(xpath, "running", timeout=300)
+ time.sleep(5)
+
+ # Get the VDU details from openstack
+ vim_client = vim_clients[cloud_account_name]
+ vm_property = vim_client.nova_server_get(vnfr.vdur[0].vim_id)
+ logger.info('VM property for {}: {}'.format(vnfd.name, vm_property))
+
+ # Establish a ssh session to VDU
+ ssh_session = SshSession(vnfr.vdur[0].management_ip)
+ assert ssh_session
+ assert ssh_session.connect(username=username, password=password)
+
+ assert vnfd.vdu[0].supplemental_boot_data.boot_data_drive == vnfr.vdur[
+ 0].supplemental_boot_data.boot_data_drive == bool(vm_property['config_drive'])
+ # Using bool() because vm_property['config_drive'] returns 'True' or '' whereas vnfr/vnfd returns True/False
+
+ # Assert 3: only for cirros vnf
+ if 'cirros' in vnfd.name:
+ for config_file in vnfd.vdu[0].supplemental_boot_data.config_file:
+ assert ssh_session.run_command('test -e {}'.format(config_file.dest))[0] == 0
+
+ vdur_metadata = {metadata.name: metadata.value for metadata in
+ vnfr.vdur[0].supplemental_boot_data.custom_meta_data}
+
+ # Get the user-data/metadata from VM
+ e_code, vm_metadata, _ = ssh_session.run_command(
+ 'curl http://{}/openstack/latest/meta_data.json'.format(metadata_host))
+ assert e_code == 0
+ vm_metadata = json.loads(vm_metadata)['meta']
+ logger.debug('VM metadata for {}: {}'.format(vnfd.name, vm_metadata))
+
+ for vdud_metadata in vnfd.vdu[0].supplemental_boot_data.custom_meta_data:
+ assert vdud_metadata.value == vdur_metadata[vdud_metadata.name]
+ assert vdud_metadata.value == vm_metadata[vdud_metadata.name]
+
+ @pytest.mark.skipif(not pytest.config.getoption("--multidisk"), reason="need --multidisk option to run")
+ def test_multidisk(self, logger, proxy, vim_clients, cloud_account_name, multidisk_testdata):
+ """
+ This feature is only supported in openstack, brocade vCPE.
+ Asserts:
+ 1. volumes match in vnfd and vnfr
+ 2. volumes match in vnfr and openstack host
+ Check no of volumes attached to the VNF VM. It should match no of volumes defined in VDUD.
+ Match volume names. In 'openstack volume show <vol_uuid>', the device should be /dev/<volume_name_in_vdud>
+ Match the volume source.
+ Match the volume size.
+ Match the Volume IDs mentioned in VNFR with openstack volume's ID.
+ """
+ ping_test_data, pong_test_data = multidisk_testdata
+ vol_attr = ['device_type', None, 'size', 'image', 'boot_priority']
+ # device_bus doesn't appear in vnfr/vdur
+
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ logger.info('Verifying VNF {}'.format(vnfd.name))
+ vnf_testdata = ping_test_data if 'ping' in vnfd.name else pong_test_data
+
+ # Assert 1: Match volumes in vnfd, vnfr, test data
+ assert len(vnfd.vdu[0].volumes) == len(vnfr.vdur[0].volumes)
+
+ for vnfr_vol in vnfr.vdur[0].volumes:
+ logger.info('Verifying vnfr volume: {}'.format(vnfr_vol.as_dict()))
+ vnfd_vol = [vol for vol in vnfd.vdu[0].volumes if vol.name==vnfr_vol.name][0]
+
+ vol_testdata = vnf_testdata[vnfr_vol.name]
+
+ for i, attr in enumerate(vol_attr):
+ if attr == None: # device_bus doesn't appear in vnfr/vdur
+ continue
+ if i == 3 and (vol_testdata[i]==None or getattr(vnfd_vol, 'ephemeral')):
+ # volume source of type ephemeral doesn't appear in vnfr/vdur
+ # If no image is defined for a volume, getattr(vnfr_vol, 'ephemeral') returns False. Strange. RIFT-15165
+ assert not getattr(vnfd_vol, 'image')
+ continue
+
+ assert getattr(vnfd_vol, attr) == getattr(vnfr_vol, attr)
+ if vol_testdata[i] is not None:
+ assert getattr(vnfd_vol, attr) == vol_testdata[i]
+
+ # Assert 2: Volumes match in vnfr and openstack host
+ # Get VM properties from the VIM
+ vim_client = vim_clients[cloud_account_name]
+ vm_property = vim_client.nova_server_get(vnfr.vdur[0].vim_id)
+ logger.info('VIM- VM properties: {}'.format(vm_property))
+
+ # Get the volumes attached to this VNF VM
+ vim_volumes = vm_property['os-extended-volumes:volumes_attached']
+ logger.info('VIM- Volumes attached to this VNF VM: {}'.format(vim_volumes))
+
+ assert vim_volumes
+ assert len(vim_volumes) == len(vnfr.vdur[0].volumes)
+
+ vnfr_volumes_by_id = {vol.volume_id:vol for vol in vnfr.vdur[0].volumes}
+ for vim_volume in vim_volumes:
+ # Match the Volume IDs mentioned in VNFR with openstack volume's ID.
+ logger.info('Verifying volume: {}'.format(vim_volume['id']))
+ assert vim_volume['id'] in vnfr_volumes_by_id.keys()
+ vnfr_vol_ = vnfr_volumes_by_id[vim_volume['id']]
+
+ # Get volume details. Equivalent cli: openstack volume show <uuid>
+ vim_vol_attrs = vim_client.cinder_volume_get(vim_volume['id'])
+
+ # Match volume size
+ assert vnfr_vol_.size == vim_vol_attrs.size
+
+ # Match volume source
+ if vnfr_vol_.image: # To make sure this is not ephemeral type
+ logger.info('VIM- Image details of the volume: {}'.format(vim_vol_attrs.volume_image_metadata))
+ assert vnfr_vol_.image == vim_vol_attrs.volume_image_metadata['image_name']
+ else:
+ assert not hasattr(vim_vol_attrs, 'volume_image_metadata')
+
+ # Match volume name e.g 'device': u'/dev/vdf'
+ logger.info('Verifying [{}] in attached volumes {}'.format(vnfr_vol_.name, vim_vol_attrs.attachments))
+ assert [attachment for attachment in vim_vol_attrs.attachments if vnfr_vol_.name in attachment['device']]
+
+ @pytest.mark.skipif(not pytest.config.getoption("--l2-port-chaining"), reason="need --l2-port-chaining option to run")
+ def test_l2_port_chaining(self, proxy):
+ """
+ It uses existing NS, VNF packages: $RIFT_INSTALL/usr/rift/mano/nsds/vnffg_demo_nsd/vnffg_l2portchain_*.
+ This test function is specific to these packages. Those VNFs use Ubuntu trusty image ubuntu_trusty_1404.qcow2.
+ Asserts:
+ 1. Count of VNFFG in nsd and nsr
+ 2. Count of rsp, classifier in VNFFG descriptor and VNFFG record
+ 3. Need details what other fields need to be matched in nsd and nsr
+ 4. Traffic flows through internal hops as per the classifier and rsp
+ As per the classifiers in NS package, the following flows will be tested.
+ - Tcp packets with dest port 80 starting from pgw VNF should go through Firewall VNF.
+ - Udp packets with source port 80 starting from router VNF should go through nat->dpi
+ - Udp packets with dest port 80 starting from pgw VNF should go through dpi->nat
+
+ """
+ UDP_PROTOCOL, TCP_PROTOCOL = 17, 6
+
+ def pcap_analysis(pcap_file, src_add, dst_add, src_port=None, dst_port=None, protocol=6):
+ """Analyse packets in a pcap file and return True if there is a packet match w.r.t src_addr, dst_addr, protocol.
+ Args:
+ pcap_file: pcap file that is generated by traffic analysis utility such as tcpdump
+ src_add, dst_addr: Source & dest IP which need to be matched for a packet
+ protocol: Protocol that needs to be matched for a packet which already matched src_addr, dst_addr (protocol accepts integer e.g TCP 6, UDP 17)
+
+ Returns:
+ timestamp of the packet which is matched (Needed to check packet flow order through VNFs)
+ or
+ False: if there is no packet match
+
+ It uses scapy module to analyse pcap file. pip3 install scapy-python3
+ Other options https://pypi.python.org/pypi/pypcapfile
+ """
+ assert os.path.exists(pcap_file)
+ pkt_type = TCP if protocol==6 else UDP
+
+ pcap_obj = rdpcap(pcap_file)
+ for pkt in pcap_obj:
+ if IP in pkt:
+ if not(pkt[IP].src==src_add and pkt[IP].dst==dst_add and pkt[IP].proto==protocol):
+ continue
+ if pkt_type in pkt:
+ if src_port:
+ if not (pkt[pkt_type].sport==src_port):
+ continue
+ if dst_port:
+ if not (pkt[pkt_type].dport==dst_port):
+ continue
+ return pkt[IP].time
+ return False
+
+ # Check the VNFFG in nsd and nsr
+ for nsd, nsr in yield_nsd_nsr_pairs(proxy):
+ vnffgds = nsd.vnffgd
+ vnffgrs = nsr.vnffgr
+ assert len(vnffgds) == len(vnffgrs)
+
+ # Check the classifier, rsp in nsd and nsr
+ for vnffgd in vnffgds:
+ vnffgr = [vnffgr for vnffgr in vnffgrs if vnffgd.id == vnffgr.vnffgd_id_ref][0]
+ assert len(vnffgd.rsp) == len(vnffgr.rsp)
+ assert len(vnffgd.classifier) == len(vnffgr.classifier)
+
+ vnfrs = proxy(RwVnfrYang).get('/rw-project:project[rw-project:name="default"]/vnfr-catalog/vnfr', list_obj=True)
+
+ # Get the IP of VMs
+ vm_names = ('router', 'firewall', 'dpi', 'nat', 'pgw')
+ vm_ips = {vm_name: vnfr.vdur[0].vm_management_ip for vm_name in vm_names for vnfr in vnfrs.vnfr if
+ vm_name in vnfr.name}
+ vm_cp_ips = {vm_name: vnfr.connection_point[0].ip_address for vm_name in vm_names for vnfr in vnfrs.vnfr if
+ vm_name in vnfr.name}
+
+ # Establish Ssh sessions to the VMs
+ ssh_sessions = {}
+ for vm_name, vm_ip in vm_ips.items():
+ ssh_session = SshSession(vm_ip)
+ assert ssh_session
+ assert ssh_session.connect(username='ubuntu', password='ubuntu')
+ ssh_sessions[vm_name] = ssh_session
+
+ # Start python's SimpleHTTPServer on port 80 in the router VM
+ e_code, _, _ = ssh_sessions['router'].run_command('sudo python -m SimpleHTTPServer 80', max_wait=5)
+ assert e_code is None # Due to blocking call, it should timeout and return 'None' as exit code
+
+
+ # Check: Tcp packets with dest port 80 starting from pgw VNF should go through Firewall VNF.
+ pcap_file = 'l2test_firewall.pcap'
+ # Start tcpdump in firewall vnf and start sending tcp packets from pgw vnf
+ e_code, _, _ = ssh_sessions['firewall'].run_command(
+ 'sudo tcpdump -i eth1 -w {pcap} & sleep 10; sudo kill $!'.format(pcap=pcap_file), max_wait=4)
+ e_code, _, _ = ssh_sessions['pgw'].run_command('sudo nc {router_ip} 80 -w 0'.format(router_ip=vm_cp_ips['router']))
+
+ # Copy pcap file from firewall vnf for packet analysis
+ time.sleep(10)
+ assert ssh_sessions['firewall'].get(pcap_file, pcap_file)
+ assert pcap_analysis(pcap_file, vm_cp_ips['pgw'], vm_cp_ips['router'], dst_port=80, protocol=TCP_PROTOCOL)
+
+
+ # Check: Udp packets with source port 80 starting from router VNF should go through nat->dpi
+ pcap_nat = 'l2test_nat1.pcap'
+ pcap_dpi = 'l2test_dpi1.pcap'
+ # Start tcpdump in nat, dpi vnf and start sending udp packets from router vnf
+ e_code, _, _ = ssh_sessions['nat'].run_command(
+ 'sudo tcpdump -i eth1 -w {pcap} & sleep 15; sudo kill $!'.format(pcap=pcap_nat), max_wait=4)
+ e_code, _, _ = ssh_sessions['dpi'].run_command(
+ 'sudo tcpdump -i eth1 -w {pcap} & sleep 10; sudo kill $!'.format(pcap=pcap_dpi), max_wait=4)
+ e_code, _, _ = ssh_sessions['router'].run_command(
+ 'echo -n "hello" | sudo nc -4u {pgw_ip} 1000 -s {router_ip} -p 80 -w 0'.format(pgw_ip=vm_cp_ips['pgw'],
+ router_ip=vm_cp_ips[
+ 'router']))
+
+ # Copy pcap file from nat, dpi vnf for packet analysis
+ time.sleep(10)
+ assert ssh_sessions['nat'].get(pcap_nat, pcap_nat)
+ assert ssh_sessions['dpi'].get(pcap_dpi, pcap_dpi)
+ packet_ts_nat = pcap_analysis(pcap_nat, vm_cp_ips['router'], vm_cp_ips['pgw'], src_port=80, protocol=UDP_PROTOCOL)
+ packet_ts_dpi = pcap_analysis(pcap_dpi, vm_cp_ips['router'], vm_cp_ips['pgw'], src_port=80, protocol=UDP_PROTOCOL)
+ assert packet_ts_nat
+ assert packet_ts_dpi
+ assert packet_ts_nat < packet_ts_dpi # Packet flow must follow nat -> dpi
+
+
+ # Check: Udp packets with dest port 80 starting from pgw VNF should go through dpi->nat
+ pcap_nat = 'l2test_nat2.pcap'
+ pcap_dpi = 'l2test_dpi2.pcap'
+ # Start tcpdump in nat, dpi vnf and start sending udp packets from router vnf
+ e_code, _, _ = ssh_sessions['nat'].run_command(
+ 'sudo tcpdump -i eth1 -w {pcap} & sleep 15; sudo kill $!'.format(pcap=pcap_nat), max_wait=4)
+ e_code, _, _ = ssh_sessions['dpi'].run_command(
+ 'sudo tcpdump -i eth1 -w {pcap} & sleep 10; sudo kill $!'.format(pcap=pcap_dpi), max_wait=4)
+ e_code, _, _ = ssh_sessions['pgw'].run_command(
+ 'echo -n "hello" | sudo nc -4u {router_ip} 80 -w 0'.format(router_ip=vm_cp_ips['router']))
+
+ # Copy pcap file from nat, dpi vnf for packet analysis
+ time.sleep(10)
+ assert ssh_sessions['nat'].get(pcap_nat, pcap_nat)
+ assert ssh_sessions['dpi'].get(pcap_dpi, pcap_dpi)
+ packet_ts_nat = pcap_analysis(pcap_nat, vm_cp_ips['pgw'], vm_cp_ips['router'], dst_port=80, protocol=UDP_PROTOCOL)
+ packet_ts_dpi = pcap_analysis(pcap_dpi, vm_cp_ips['pgw'], vm_cp_ips['router'], dst_port=80, protocol=UDP_PROTOCOL)
+ assert packet_ts_nat
+ assert packet_ts_dpi
+ # The below assert used to fail while testing. ts_dpi is ahead of ts_nat in few microseconds
+ # Need to confirm if thats expected
+ assert packet_ts_dpi < packet_ts_nat # Packet flow must follow dpi -> nat