#
-# Copyright 2016 RIFT.IO Inc
+# Copyright 2016-2017 RIFT.io Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
import collections
+import gi
+import json
+import operator
+import os
+import pytest
+import re
import socket
import subprocess
import time
-import pytest
-
-import gi
-import re
+from scapy.all import rdpcap, UDP, TCP, IP
gi.require_version('RwNsrYang', '1.0')
from gi.repository import (
- NsdYang,
+ RwProjectNsdYang,
RwBaseYang,
RwConmanYang,
RwNsrYang,
- RwNsdYang,
RwVcsYang,
RwVlrYang,
- RwVnfdYang,
+ RwProjectVnfdYang,
RwVnfrYang,
VlrYang,
VnfrYang,
+ NsrYang,
)
+import rift.auto.mano
import rift.auto.session
import rift.mano.examples.ping_pong_nsd as ping_pong
+from rift.auto.ssh import SshSession
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
@pytest.fixture(scope='module')
'''
return ping_pong_factory.generate_descriptors()
+@pytest.fixture(scope='session')
+def updated_ping_pong_descriptors(updated_ping_pong_records):
+ '''Fixture which returns a set of updated descriptors that can be configured through
+ the management interface.
+
+ The descriptors generated by the descriptor generation process for packages don't include project
+ information (presumably in order to avoid tying them to particular project). Here they are converted
+ to types that include project information which can then be used to configure the system.
+ '''
+ ping, pong, ping_pong = updated_ping_pong_records
+ proj_ping_vnfd = RwProjectVnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd.from_dict(ping.vnfd.as_dict())
+ proj_pong_vnfd = RwProjectVnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd.from_dict(pong.vnfd.as_dict())
+ proj_ping_pong_nsd = RwProjectNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd.from_dict(ping_pong.descriptor.as_dict()['nsd'][0])
+ return proj_ping_vnfd, proj_pong_vnfd, proj_ping_pong_nsd
+
+
+class JobStatusError(Exception):
+ """JobStatusError."""
+
+ pass
+
+
def yield_vnfd_vnfr_pairs(proxy, nsr=None):
"""
Yields tuples of vnfd & vnfr entries.
Tuple: VNFD and its corresponding VNFR entry
"""
def get_vnfd(vnfd_id):
- xpath = "/vnfd-catalog/vnfd[id='{}']".format(vnfd_id)
- return proxy(RwVnfdYang).get(xpath)
+ xpath = "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd[id={}]".format(quoted_key(vnfd_id))
+ return proxy(RwProjectVnfdYang).get(xpath)
- vnfr = "/vnfr-catalog/vnfr"
+ vnfr = "/rw-project:project[rw-project:name='default']/vnfr-catalog/vnfr"
vnfrs = proxy(RwVnfrYang).get(vnfr, list_obj=True)
for vnfr in vnfrs.vnfr:
if vnfr.id not in const_vnfr_ids:
continue
- vnfd = get_vnfd(vnfr.vnfd_ref)
+ vnfd = get_vnfd(vnfr.vnfd.id)
yield vnfd, vnfr
"""
for nsr_cfg, nsr in yield_nsrc_nsro_pairs(proxy):
- nsd_path = "/nsd-catalog/nsd[id='{}']".format(
- nsr_cfg.nsd.id)
- nsd = proxy(RwNsdYang).get_config(nsd_path)
+ nsd_path = "/rw-project:project[rw-project:name='default']/nsd-catalog/nsd[id={}]".format(
+ quoted_key(nsr_cfg.nsd.id))
+ nsd = proxy(RwProjectNsdYang).get_config(nsd_path)
yield nsd, nsr
Yields:
Tuple: NSR config and its corresponding NSR op record
"""
- nsr = "/ns-instance-opdata/nsr"
+ nsr = "/rw-project:project[rw-project:name='default']/ns-instance-opdata/nsr"
nsrs = proxy(RwNsrYang).get(nsr, list_obj=True)
for nsr in nsrs.nsr:
- nsr_cfg_path = "/ns-instance-config/nsr[id='{}']".format(
- nsr.ns_instance_config_ref)
+ nsr_cfg_path = "/rw-project:project[rw-project:name='default']/ns-instance-config/nsr[id={}]".format(
+ quoted_key(nsr.ns_instance_config_ref))
nsr_cfg = proxy(RwNsrYang).get_config(nsr_cfg_path)
yield nsr_cfg, nsr
boolean
"""
try:
- socket.inet_aton(address)
- except socket.error:
- return False
- else:
+ socket.inet_pton(socket.AF_INET, address)
return True
+ except socket.error:
+ try:
+ socket.inet_pton(socket.AF_INET6, address)
+ return True
+ except socket.error:
+ return False
+ def is_ipv6(self, address):
+ """Returns True if address is of type 'IPv6', else False."""
+ try:
+ socket.inet_pton(socket.AF_INET6, address)
+ return True
+ except socket.error:
+ return False
@pytest.mark.feature("recovery")
def test_tasklets_recovery(self, mgmt_session, proxy, recover_tasklet):
def test_records_present(self, proxy):
assert_records(proxy)
- def test_nsd_ref_count(self, proxy):
- """
- Asserts
- 1. The ref count data of the NSR with the actual number of NSRs
- """
- nsd_ref_xpath = "/ns-instance-opdata/nsd-ref-count"
- nsd_refs = proxy(RwNsrYang).get(nsd_ref_xpath, list_obj=True)
-
- expected_ref_count = collections.defaultdict(int)
- for nsd_ref in nsd_refs.nsd_ref_count:
- expected_ref_count[nsd_ref.nsd_id_ref] = nsd_ref.instance_ref_count
-
- actual_ref_count = collections.defaultdict(int)
- for nsd, nsr in yield_nsd_nsr_pairs(proxy):
- actual_ref_count[nsd.id] += 1
-
- assert expected_ref_count == actual_ref_count
-
def test_vnfd_ref_count(self, proxy):
"""
Asserts
1. The ref count data of the VNFR with the actual number of VNFRs
"""
- vnfd_ref_xpath = "/vnfr-catalog/vnfd-ref-count"
+ vnfd_ref_xpath = "/rw-project:project[rw-project:name='default']/vnfr-catalog/vnfd-ref-count"
vnfd_refs = proxy(RwVnfrYang).get(vnfd_ref_xpath, list_obj=True)
expected_ref_count = collections.defaultdict(int)
for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
assert vnfd.mgmt_interface.port == vnfr.mgmt_interface.port
assert len(vnfd.vdu) == len(vnfr.vdur)
-
for vdud, vdur in zip(vnfd.vdu, vnfr.vdur):
- assert vdud.vm_flavor == vdur.vm_flavor
+ for field in vdud.vm_flavor.fields:
+ if field in vdur.vm_flavor.fields:
+ assert getattr(vdud.vm_flavor, field) == getattr(vdur.vm_flavor, field)
assert self.is_valid_ip(vdur.management_ip) is True
- assert vdud.external_interface[0].vnfd_connection_point_ref == \
- vdur.external_interface[0].vnfd_connection_point_ref
+
+ vdur_intf_dict = {}
+ for intf in vdur.interface:
+ vdur_intf_dict[intf.name] = intf.external_connection_point_ref if 'external_connection_point_ref' in \
+ intf.as_dict() else intf.internal_connection_point_ref
+ for intf in vdud.interface:
+ assert intf.name in vdur_intf_dict
+ if intf.internal_connection_point_ref:
+ vdud_intf_cp_ref = intf.internal_connection_point_ref
+ else:
+ vdud_intf_cp_ref = intf.external_connection_point_ref
+ assert vdur_intf_dict[intf.name] == vdud_intf_cp_ref
def test_external_vl(self, proxy):
"""
assert cp_des[0].name == cp_rec[0].name
assert self.is_valid_ip(cp_rec[0].ip_address) is True
- xpath = "/vlr-catalog/vlr[id='{}']".format(cp_rec[0].vlr_ref)
+ xpath = "/rw-project:project[rw-project:name='default']/vlr-catalog/vlr[id={}]".format(quoted_key(cp_rec[0].vlr_ref))
vlr = proxy(RwVlrYang).get(xpath)
assert len(vlr.network_id) > 0
assert self.is_valid_ip(ip) is True
assert vlr.operational_status == "running"
-
+ @pytest.mark.skipif(pytest.config.getoption("--port-sequencing"), reason="port-sequencing test uses two VLs in NSD")
def test_nsr_record(self, proxy):
"""
Currently we only test for the components of NSR tests. Ignoring the
"""
for nsr_cfg, nsr in yield_nsrc_nsro_pairs(proxy):
# 1 n/w and 2 connection points
- assert len(nsr.vlr) == 1
+ assert len(nsr.vlr) == 2
assert len(nsr.vlr[0].vnfr_connection_point_ref) == 2
assert len(nsr.constituent_vnfr_ref) == 2
assert nsr_cfg.admin_status == 'ENABLED'
- def test_wait_for_pingpong_configured(self, proxy):
- nsr_opdata = proxy(RwNsrYang).get('/ns-instance-opdata')
+ def test_wait_for_ns_configured(self, proxy):
+ nsr_opdata = proxy(RwNsrYang).get('/rw-project:project[rw-project:name="default"]/ns-instance-opdata')
nsrs = nsr_opdata.nsr
assert len(nsrs) == 1
current_nsr = nsrs[0]
- xpath = "/ns-instance-opdata/nsr[ns-instance-config-ref='{}']/config-status".format(current_nsr.ns_instance_config_ref)
+ xpath = "/rw-project:project[rw-project:name='default']/ns-instance-opdata/nsr[ns-instance-config-ref={}]/config-status".format(quoted_key(current_nsr.ns_instance_config_ref))
proxy(RwNsrYang).wait_for(xpath, "configured", timeout=400)
- def test_monitoring_params(self, proxy):
+ def test_wait_for_pingpong_vnf_configured(self, proxy):
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ xpath = "/rw-project:project[rw-project:name='default']/vnfr-catalog/vnfr[id={}]/config-status".format(quoted_key(vnfr.id))
+ proxy(VnfrYang).wait_for(xpath, "configured", timeout=400)
+
+ def test_vnf_monitoring_params(self, proxy):
"""
Asserts:
1. The value counter ticks?
2. If the meta fields are copied over
"""
def mon_param_record(vnfr_id, mon_param_id):
- return '/vnfr-catalog/vnfr[id="{}"]/monitoring-param[id="{}"]'.format(
- vnfr_id, mon_param_id)
+ return '/rw-project:project[rw-project:name="default"]/vnfr-catalog/vnfr[id={}]/monitoring-param[id={}]'.format(
+ quoted_key(vnfr_id), quoted_key(mon_param_id))
for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
for mon_des in (vnfd.monitoring_param):
# Tick check
#assert mon_rec.value_integer > 0
- def test_cm_nsr(self, proxy):
+ def test_ns_monitoring_params(self, logger, proxy):
+ """
+ Asserts:
+ 1. monitoring-param match in nsd and ns-opdata
+ 2. The value counter ticks?
+ """
+ mon_param_path = '/rw-project:project[rw-project:name="default"]/ns-instance-opdata/nsr[ns-instance-config-ref={}]/monitoring-param[id={}]'
+ def fetch_monparam_value(nsr_ref, mon_param_id):
+ """Returns the monitoring parameter value"""
+ mon_param = proxy(NsrYang).get(mon_param_path.format(quoted_key(nsr_ref), quoted_key(mon_param_id)))
+ return mon_param.value_integer
+
+ def check_monparam_value(nsr_ref, mon_param_id):
+ """Check if monitoring-param values are getting updated"""
+ recent_mon_param_value = fetch_monparam_value(nsr_ref, mon_param_id)
+
+ # Monitor the values over a period of 60 secs. Fail the test if there is no update in mon-param value.
+ s_time = time.time()
+ while (time.time() - s_time) < 60:
+ if fetch_monparam_value(nsr_ref, mon_param_id) > recent_mon_param_value:
+ return
+ time.sleep(5)
+ assert False, 'mon-param values are not getting updated. Last value was {}'.format(recent_mon_param_value)
+
+ for nsd, nsr in yield_nsd_nsr_pairs(proxy):
+ assert len(nsd.monitoring_param) == len(nsr.monitoring_param)
+ for mon_param in nsr.monitoring_param:
+ logger.info('Verifying monitoring-param: {}'.format(mon_param.as_dict()))
+ check_monparam_value(nsr.ns_instance_config_ref, mon_param.id)
+
+ def test_cm_nsr(self, proxy, use_accounts):
"""
Asserts:
1. The ID of the NSR in cm-state
4. State of the cm-nsr
"""
for nsd, nsr in yield_nsd_nsr_pairs(proxy):
- con_nsr_xpath = "/cm-state/cm-nsr[id='{}']".format(nsr.ns_instance_config_ref)
+ con_nsr_xpath = "/rw-project:project[rw-project:name='default']/cm-state/cm-nsr[id={}]".format(
+ quoted_key(nsr.ns_instance_config_ref))
con_data = proxy(RwConmanYang).get(con_nsr_xpath)
- assert con_data.name == "ping_pong_nsd"
+ if not use_accounts:
+ assert con_data.name == rift.auto.mano.resource_name(nsd.name)
+
assert len(con_data.cm_vnfr) == 2
state_path = con_nsr_xpath + "/state"
2. Name of the vnfr
3. State of the VNFR
4. Checks for a reachable IP in mgmt_interface
- 5. Basic checks for connection point and cfg_location.
+ 5. Basic checks for connection point
"""
def is_reachable(ip, timeout=10):
rc = subprocess.call(["ping", "-c1", "-w", str(timeout), ip])
return False
nsr_cfg, _ = list(yield_nsrc_nsro_pairs(proxy))[0]
- con_nsr_xpath = "/cm-state/cm-nsr[id='{}']".format(nsr_cfg.id)
+ con_nsr_xpath = "/rw-project:project[rw-project:name='default']/cm-state/cm-nsr[id={}]".format(quoted_key(nsr_cfg.id))
for _, vnfr in yield_vnfd_vnfr_pairs(proxy):
- con_vnfr_path = con_nsr_xpath + "/cm-vnfr[id='{}']".format(vnfr.id)
+ con_vnfr_path = con_nsr_xpath + "/cm-vnfr[id={}]".format(quoted_key(vnfr.id))
con_data = proxy(RwConmanYang).get(con_vnfr_path)
assert con_data is not None
con_data = proxy(RwConmanYang).get(con_vnfr_path)
assert is_reachable(con_data.mgmt_interface.ip_address) is True
- assert len(con_data.connection_point) == 1
- connection_point = con_data.connection_point[0]
- assert connection_point.name == vnfr.connection_point[0].name
- assert connection_point.ip_address == vnfr.connection_point[0].ip_address
+ if pytest.config.getoption("--port-sequencing"):
+ # there are more than one connection point in the VNFDs for port sequencing test
+ # there is no distinction between icp and cp in 'show cm-state'.
+ # both icp and cp come under connection-point in 'show cm-state'
+ vnfr_intl_extl_connection_points_dict = {}
+ for icp in vnfr.vdur[0].internal_connection_point:
+ vnfr_intl_extl_connection_points_dict[icp.name] = icp.ip_address
+ for cp in vnfr.connection_point:
+ vnfr_intl_extl_connection_points_dict[cp.name] = cp.ip_address
+
+ assert len(con_data.connection_point) == len(vnfr_intl_extl_connection_points_dict)
+ for cp in con_data.connection_point:
+ assert cp.name in vnfr_intl_extl_connection_points_dict
+ assert cp.ip_address == vnfr_intl_extl_connection_points_dict[cp.name]
+ else:
+ assert len(con_data.connection_point) == 2
+ connection_point = con_data.connection_point[0]
+ assert connection_point.name == vnfr.connection_point[0].name
+ assert connection_point.ip_address == vnfr.connection_point[0].ip_address
+
+ @pytest.mark.skipif(
+ not (pytest.config.getoption("--static-ip") or pytest.config.getoption("--update-vnfd-instantiate")),
+ reason="need --static-ip or --update-vnfd-instantiate option to run")
+ def test_static_ip(self, proxy, logger, vim_clients, cloud_account_name):
+ """
+ Asserts:
+ 1. static-ip match in vnfd and vnfr
+ 2. static-ip match in cm-state
+ 3. Get the IP of openstack VM. Match the static-ip
+ 4. Check if the VMs are reachable from each other (Skip if type of static ip addresses is IPv6)
+ """
+ nsr_cfg, _ = list(yield_nsrc_nsro_pairs(proxy))[0]
+ con_nsr_xpath = "/rw-project:project[rw-project:name='default']/cm-state/cm-nsr[id={}]".format(quoted_key(nsr_cfg.id))
+
+ ips = {}
+ static_ip_vnfd = False
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ if vnfd.vdu[0].interface[1].static_ip_address:
+ static_ip_vnfd = True
+ assert vnfd.vdu[0].interface[1].static_ip_address == vnfr.connection_point[1].ip_address
+ if 'ping' in vnfd.name:
+ ips['mgmt_ip'] = vnfr.vdur[0].management_ip
+ else:
+ ips['static_ip'] = vnfd.vdu[0].interface[1].static_ip_address
- assert con_data.cfg_location is not None
+ con_vnfr_path = con_nsr_xpath + "/cm-vnfr[id={}]".format(quoted_key(vnfr.id))
+ con_data = proxy(RwConmanYang).get(con_vnfr_path)
+
+ assert con_data is not None
+ assert con_data.connection_point[1].ip_address == vnfd.vdu[0].interface[1].static_ip_address
+
+ xpath = "/rw-project:project[rw-project:name='default']/vlr-catalog/vlr[id={}]".format(quoted_key(vnfr.connection_point[1].vlr_ref))
+ vlr = proxy(RwVlrYang).get(xpath)
+
+ vim_client = vim_clients[cloud_account_name]
+ vm_property = vim_client.nova_server_get(vnfr.vdur[0].vim_id)
+ logger.info('VM properties for {}: {}'.format(vnfd.name, vm_property))
+
+ addr_prop_list = vm_property['addresses'][vlr.name]
+ logger.info('addresses attribute: {}'.format(addr_prop_list))
+
+ addr_prop = [addr_prop for addr_prop in addr_prop_list if addr_prop['addr'] == vnfr.connection_point[1].ip_address]
+ assert addr_prop
+
+ assert static_ip_vnfd # if False, then none of the VNF descriptors' connections points are carrying static-ip-address field.
+
+ # Check if the VMs are reachable from each other
+ username, password = ['fedora'] * 2
+ ssh_session = SshSession(ips['mgmt_ip'])
+ assert ssh_session
+ assert ssh_session.connect(username=username, password=password)
+ if not self.is_ipv6(ips['static_ip']):
+ assert ssh_session.run_command('ping -c 5 {}'.format(ips['static_ip']))[0] == 0
+
+ @pytest.mark.skipif(not pytest.config.getoption("--vnf-dependencies"), reason="need --vnf-dependencies option to run")
+ def test_vnf_dependencies(self, proxy):
+ """
+ Asserts:
+ 1. Match various config parameter sources with config primitive parameters
+ Three types of sources are being verified for pong vnfd.
+ Attribute: A runtime value like IP address of a connection point (../../../mgmt-interface, ip-address)
+ Descriptor: a XPath to a leaf in the VNF descriptor/config (../../../mgmt-interface/port)
+ Value: A pre-defined constant ('admin' as mentioned in pong descriptor)
+ 2. Match the config-parameter-map defined in NS descriptor
+ There used to be a check to verify config parameter values in cm-state (cm-state/cm-nsr/cm-vnfr/config-parameter).
+ Recently that got removed due to confd issue. So, there is no such check currently for cm-state.
+ """
+ nsr_cfg, _ = list(yield_nsrc_nsro_pairs(proxy))[0]
+ con_nsr_xpath = "/rw-project:project[rw-project:name='default']/cm-state/cm-nsr[id={}]".format(quoted_key(nsr_cfg.id))
+
+ pong_source_map, ping_request_map = None, None
+
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ # Get cm-state for this vnfr
+ con_vnfr_path = con_nsr_xpath + "/cm-vnfr[id={}]".format(quoted_key(vnfr.id))
+ con_data = proxy(RwConmanYang).get(con_vnfr_path)
+
+ # Match various config parameter sources with config primitive parameters
+ for config_primitive in vnfr.vnf_configuration.config_primitive:
+ if config_primitive.name in ("config", "start-stop"):
+ for parameter in config_primitive.parameter:
+ if parameter.name == 'mgmt_ip':
+ assert parameter.default_value == vnfr.mgmt_interface.ip_address
+ if parameter.name == 'mgmt_port':
+ assert parameter.default_value == str(vnfd.mgmt_interface.port)
+ if parameter.name == 'username':
+ assert parameter.default_value == 'admin'
+
+ # Fetch the source parameter values from pong vnf and request parameter values from ping vnf
+ if config_primitive.name == "config":
+ if vnfd.name == "pong_vnfd":
+ pong_source_map = [parameter.default_value for parameter in config_primitive.parameter if
+ parameter.name in ("service_ip", "service_port")]
+ if vnfd.name == "ping_vnfd":
+ ping_request_map = [parameter.default_value for parameter in config_primitive.parameter if
+ parameter.name in ("pong_ip", "pong_port")]
+ assert pong_source_map
+ assert ping_request_map
+ # Match the config-parameter-map defined in NS descriptor
+ assert sorted(pong_source_map) == sorted(ping_request_map)
+
+ @pytest.mark.skipif(not pytest.config.getoption("--port-security"), reason="need --port-security option to run")
+ def test_port_security(self, proxy, vim_clients, cloud_account_name):
+ """
+ Asserts:
+ 1. port-security-enabled match in vnfd and vnfr
+ 2. Get port property from openstack. Match these attributes: 'port_security_enabled', 'security_groups'
+ """
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ assert vnfd.connection_point[1].port_security_enabled == vnfr.connection_point[1].port_security_enabled
+
+ xpath = "/rw-project:project[rw-project:name='default']/vlr-catalog/vlr[id={}]".format(quoted_key(vnfr.connection_point[1].vlr_ref))
+ vlr = proxy(RwVlrYang).get(xpath)
+
+ vim_client = vim_clients[cloud_account_name]
+ port = [port for port in vim_client.neutron_port_list() if port['network_id'] == vlr.network_id if
+ port['name'] == vnfr.connection_point[1].name]
+ assert port
+
+ port_openstack = port[0]
+ assert vnfr.connection_point[1].port_security_enabled == port_openstack['port_security_enabled']
+
+ if vnfr.connection_point[1].port_security_enabled:
+ assert port_openstack['security_groups'] # It has to carry at least one security group if enabled
+ else:
+ assert not port_openstack['security_groups']
+
+ @pytest.mark.skipif(not pytest.config.getoption("--port-sequencing"), reason="need --port-sequencing option to run")
+ def test_explicit_port_sequencing(self, proxy, vim_clients, cloud_account_name, logger, port_sequencing_intf_positions, iteration):
+ """
+ Asserts:
+ 1. Interface count match in vnfd and vnfr
+ 2. Get interface ordering(mac address) from VM using 'ip a' command; From output of neutron port-list, get
+ corresponding connection point names in the same order as mac address ordered list.
+ 3. Get interface ordering from the vnfd/vdu
+ 4. Compare lists from step-2 and step-3
+ """
+ username, password = ['fedora']*2
+
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ assert len(vnfd.vdu[0].interface) == len(vnfr.vdur[0].interface)
+
+ logger.debug('Interface details for vnfd {}: {}'.format(vnfd.name, vnfd.vdu[0].as_dict()['interface']))
+
+ if iteration==1:
+ tmp_positional_values_list = []
+ for intf in vnfr.vdur[0].interface:
+ # if no position is specified for an interface, then vnfr/vdur/interface carries 0 as its positional value
+ if intf.position!=0:
+ tmp_positional_values_list.append(intf.position)
+ if 'ping' in vnfd.name:
+ assert not tmp_positional_values_list
+ if 'pong' in vnfd.name:
+ assert set(tmp_positional_values_list) == set(port_sequencing_intf_positions)
+
+ # Get a sorted list of interfaces from vnfd/vdu
+ icp_key_name, ecp_key_name = 'internal_connection_point_ref', 'external_connection_point_ref'
+ intf_with_position_field_dict, intf_without_position_field_list = {}, []
+
+ for intf in vnfd.vdu[0].interface:
+ intf = intf.as_dict()
+ cp_ref_key = icp_key_name if icp_key_name in intf else ecp_key_name
+ if 'position' in intf:
+ intf_with_position_field_dict[intf['position']] = intf[cp_ref_key]
+ else:
+ intf_without_position_field_list.append(intf[cp_ref_key])
+
+ intf_with_position_field_list = sorted(intf_with_position_field_dict.items(), key=operator.itemgetter(0))
+ sorted_cp_names_in_vnfd = [pos_cpname_tuple[1] for pos_cpname_tuple in intf_with_position_field_list] + \
+ sorted(intf_without_position_field_list)
+
+ # Establish a ssh session to VDU to get mac address list sorted by interfaces
+ ssh_session = SshSession(vnfr.vdur[0].management_ip)
+ assert ssh_session
+ assert ssh_session.connect(username=username, password=password)
+ e_code, ip_output, err = ssh_session.run_command('sudo ip a')
+ assert e_code == 0
+ logger.debug('Output of "ip a": {}'.format(ip_output))
+ mac_addr_list = re.findall(r'link/ether\s+(.*)\s+brd', ip_output)
+
+ # exclude eth0 as it is always a mgmt-interface
+ interface_starting_index = len(mac_addr_list) - len(vnfd.vdu[0].interface)
+ mac_addr_list = mac_addr_list[interface_starting_index: ]
+
+ # Get neutron port list
+ neutron_port_list = vim_clients[cloud_account_name].neutron_port_list()
+
+ # Get those ports whose mac_address value matches with one of the mac addresses in mac_addr_list
+ # This new list is already sorted as the outer loop iterates over mac_addr_list
+ sorted_cp_names_in_vm = [neutron_port_dict['name'] for mac in mac_addr_list for neutron_port_dict in neutron_port_list
+ if mac==neutron_port_dict['mac_address']]
+
+ logger.debug('Sorted connection points as per "ip a" in VM: {}'.format(sorted_cp_names_in_vm))
+ logger.debug('Sorted connection points as per ordering mentioned in vnfd: {}'.format(sorted_cp_names_in_vnfd))
+
+ assert sorted_cp_names_in_vm == sorted_cp_names_in_vnfd
+
+ @pytest.mark.skipif(
+ not (pytest.config.getoption("--vnf-dependencies") and
+ pytest.config.getoption("--service-primitive")),
+ reason="need --vnf-dependencies and --service-primitive option to run")
+ def test_primitives(
+ self, mgmt_session, cloud_module, cloud_account, descriptors,
+ fmt_nsd_catalog_xpath, logger):
+ """Testing service primitives and config primitives."""
+ # Create a cloud account
+ rift.auto.mano.create_cloud_account(
+ mgmt_session, cloud_account, "default")
+
+ rwnsr_pxy = mgmt_session.proxy(RwNsrYang)
+ nsr_pxy = mgmt_session.proxy(NsrYang)
+ rwvnfr_pxy = mgmt_session.proxy(RwVnfrYang)
+
+ # Testing a custom service primitive
+ ns_opdata = rwnsr_pxy.get(
+ '/rw-project:project[rw-project:name="default"]' +
+ '/ns-instance-opdata/nsr'
+ )
+ nsr_id = ns_opdata.ns_instance_config_ref
+ sp_rpc_input = NsrYang.YangInput_Nsr_ExecNsServicePrimitive.from_dict(
+ {'name': 'primitive_test', 'nsr_id_ref': nsr_id})
+ nsr_pxy.rpc(sp_rpc_input)
+
+ # Testing a config primitive
+ vnfr_catalog = rwvnfr_pxy.get(
+ '/rw-project:project[rw-project:name="default"]' +
+ '/vnfr-catalog'
+ )
+ cp_rpc_input = NsrYang.YangInput_Nsr_ExecNsServicePrimitive.from_dict(
+ {'nsr_id_ref': nsr_id})
+ vnf_list = cp_rpc_input.create_vnf_list()
+ vnf_primitive = vnf_list.create_vnf_primitive()
+ vnf_primitive.index = 1
+ vnf_primitive.name = "start-stop"
+ vnf_list.member_vnf_index_ref = (
+ vnfr_catalog.vnfr[0].member_vnf_index_ref
+ )
+ vnf_list._set_vnfr_id_ref(vnfr_catalog.vnfr[0].id)
+ vnf_list.vnf_primitive.append(vnf_primitive)
+ cp_rpc_input.vnf_list.append(vnf_list)
+ nsr_pxy.rpc(cp_rpc_input)
+ # Checking nsd joblist to see if both tests passed
+
+ def check_job_status(status=None):
+ ns_opdata = rwnsr_pxy.get(
+ '/rw-project:project[rw-project:name="default"]' +
+ '/ns-instance-opdata/nsr'
+ )
+ counter = 0
+ counter_limit = 2
+ for idx in range(0, counter_limit):
+ if ns_opdata.config_agent_job[idx].job_status == 'failure':
+ err_msg = (
+ 'Service primitive test failed.' +
+ ' The config agent reported failure job status')
+ raise JobStatusError(err_msg)
+
+ elif ns_opdata.config_agent_job[idx].job_status == 'success':
+ counter += 1
+ continue
+
+ if counter == counter_limit:
+ return True
+ else:
+ time.sleep(5)
+ return False
+
+ start_time = time.time()
+ while (time.time() - start_time < 60):
+ status = check_job_status()
+ if status:
+ break
+ else:
+ err_msg = (
+ 'Service primitive test failed. Timed out: 60 seconds' +
+ 'The config agent never reached a success status')
+ raise JobStatusError(err_msg)
+
+ @pytest.mark.skipif(
+ not (pytest.config.getoption("--metadata-vdud") or pytest.config.getoption("--metadata-vdud-cfgfile")),
+ reason="need --metadata-vdud or --metadata-vdud-cfgfile option to run")
+ def test_metadata_vdud(self, logger, proxy, vim_clients, cloud_account_name, metadata_host):
+ """
+ Asserts:
+ 1. content of supplemental-boot-data match in vnfd and vnfr
+ vnfr may carry extra custom-meta-data fields (e.g pci_assignement) which are by default enabled during VM creation by openstack.
+ vnfr doesn't carry config_file details; so that will be skipped during matching.
+ 2. boot-data-drive match with openstack VM's config_drive attribute
+ 3. For each VDUD which have config-file fields mentioned, check if there exists a path in the VM which
+ matches with config-file's dest field. (Only applicable for cirros_cfgfile_vnfd VNF RIFT-15524)
+ 4. For each VDUD, match its custom-meta-data fields with openstack VM's properties field
+ """
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ if any(name in vnfd.name for name in ['ping', 'pong', 'fedora']):
+ username, password = ['fedora'] * 2
+ elif 'ubuntu' in vnfd.name:
+ username, password = ['ubuntu'] * 2
+ elif 'cirros' in vnfd.name:
+ username, password = 'cirros', 'cubswin:)'
+ else:
+ assert False, 'Not expected to use this VNFD {} in this systemtest. VNFD might have changed. Exiting the test.'.format(
+ vnfd.name)
+
+ # Wait till VNF's operational-status becomes 'running'
+ # The below check is usually covered as part of test_wait_for_ns_configured
+ # But, this is mostly needed when non- ping pong packages are used e.g cirrus cfgfile package
+ xpath = "/rw-project:project[rw-project:name='default']/vnfr-catalog/vnfr[id={}]/operational-status".format(quoted_key(vnfr.id))
+ proxy(VnfrYang).wait_for(xpath, "running", timeout=300)
+ time.sleep(5)
+
+ # Get the VDU details from openstack
+ vim_client = vim_clients[cloud_account_name]
+ vm_property = vim_client.nova_server_get(vnfr.vdur[0].vim_id)
+ logger.info('VM property for {}: {}'.format(vnfd.name, vm_property))
+
+ # Establish a ssh session to VDU
+ ssh_session = SshSession(vnfr.vdur[0].management_ip)
+ assert ssh_session
+ assert ssh_session.connect(username=username, password=password)
+
+ assert vnfd.vdu[0].supplemental_boot_data.boot_data_drive == vnfr.vdur[
+ 0].supplemental_boot_data.boot_data_drive == bool(vm_property['config_drive'])
+ # Using bool() because vm_property['config_drive'] returns 'True' or '' whereas vnfr/vnfd returns True/False
+
+ # Assert 3: only for cirros vnf
+ if 'cirros' in vnfd.name:
+ for config_file in vnfd.vdu[0].supplemental_boot_data.config_file:
+ assert ssh_session.run_command('test -e {}'.format(config_file.dest))[0] == 0
+
+ vdur_metadata = {metadata.name: metadata.value for metadata in
+ vnfr.vdur[0].supplemental_boot_data.custom_meta_data}
+
+ # Get the user-data/metadata from VM
+ e_code, vm_metadata, _ = ssh_session.run_command(
+ 'curl http://{}/openstack/latest/meta_data.json'.format(metadata_host))
+ assert e_code == 0
+ vm_metadata = json.loads(vm_metadata)['meta']
+ logger.debug('VM metadata for {}: {}'.format(vnfd.name, vm_metadata))
+
+ for vdud_metadata in vnfd.vdu[0].supplemental_boot_data.custom_meta_data:
+ assert vdud_metadata.value == vdur_metadata[vdud_metadata.name]
+ assert vdud_metadata.value == vm_metadata[vdud_metadata.name]
+
+ @pytest.mark.skipif(not pytest.config.getoption("--multidisk"), reason="need --multidisk option to run")
+ def test_multidisk(self, logger, proxy, vim_clients, cloud_account_name, multidisk_testdata):
+ """
+ This feature is only supported in openstack, brocade vCPE.
+ Asserts:
+ 1. volumes match in vnfd and vnfr
+ 2. volumes match in vnfr and openstack host
+ Check no of volumes attached to the VNF VM. It should match no of volumes defined in VDUD.
+ Match volume names. In 'openstack volume show <vol_uuid>', the device should be /dev/<volume_name_in_vdud>
+ Match the volume source.
+ Match the volume size.
+ Match the Volume IDs mentioned in VNFR with openstack volume's ID.
+ """
+ ping_test_data, pong_test_data = multidisk_testdata
+ vol_attr = ['device_type', None, 'size', 'image', 'boot_priority']
+ # device_bus doesn't appear in vnfr/vdur
+
+ for vnfd, vnfr in yield_vnfd_vnfr_pairs(proxy):
+ logger.info('Verifying VNF {}'.format(vnfd.name))
+ vnf_testdata = ping_test_data if 'ping' in vnfd.name else pong_test_data
+
+ # Assert 1: Match volumes in vnfd, vnfr, test data
+ assert len(vnfd.vdu[0].volumes) == len(vnfr.vdur[0].volumes)
+
+ for vnfr_vol in vnfr.vdur[0].volumes:
+ logger.info('Verifying vnfr volume: {}'.format(vnfr_vol.as_dict()))
+ vnfd_vol = [vol for vol in vnfd.vdu[0].volumes if vol.name==vnfr_vol.name][0]
+
+ vol_testdata = vnf_testdata[vnfr_vol.name]
+
+ for i, attr in enumerate(vol_attr):
+ if attr == None: # device_bus doesn't appear in vnfr/vdur
+ continue
+ if i == 3 and (vol_testdata[i]==None or getattr(vnfd_vol, 'ephemeral')):
+ # volume source of type ephemeral doesn't appear in vnfr/vdur
+ # If no image is defined for a volume, getattr(vnfr_vol, 'ephemeral') returns False. Strange. RIFT-15165
+ assert not getattr(vnfd_vol, 'image')
+ continue
+
+ assert getattr(vnfd_vol, attr) == getattr(vnfr_vol, attr)
+ if vol_testdata[i] is not None:
+ assert getattr(vnfd_vol, attr) == vol_testdata[i]
+
+ # Assert 2: Volumes match in vnfr and openstack host
+ # Get VM properties from the VIM
+ vim_client = vim_clients[cloud_account_name]
+ vm_property = vim_client.nova_server_get(vnfr.vdur[0].vim_id)
+ logger.info('VIM- VM properties: {}'.format(vm_property))
+
+ # Get the volumes attached to this VNF VM
+ vim_volumes = vm_property['os-extended-volumes:volumes_attached']
+ logger.info('VIM- Volumes attached to this VNF VM: {}'.format(vim_volumes))
+
+ assert vim_volumes
+ assert len(vim_volumes) == len(vnfr.vdur[0].volumes)
+
+ vnfr_volumes_by_id = {vol.volume_id:vol for vol in vnfr.vdur[0].volumes}
+ for vim_volume in vim_volumes:
+ # Match the Volume IDs mentioned in VNFR with openstack volume's ID.
+ logger.info('Verifying volume: {}'.format(vim_volume['id']))
+ assert vim_volume['id'] in vnfr_volumes_by_id.keys()
+ vnfr_vol_ = vnfr_volumes_by_id[vim_volume['id']]
+
+ # Get volume details. Equivalent cli: openstack volume show <uuid>
+ vim_vol_attrs = vim_client.cinder_volume_get(vim_volume['id'])
+
+ # Match volume size
+ assert vnfr_vol_.size == vim_vol_attrs.size
+
+ # Match volume source
+ if vnfr_vol_.image: # To make sure this is not ephemeral type
+ logger.info('VIM- Image details of the volume: {}'.format(vim_vol_attrs.volume_image_metadata))
+ assert vnfr_vol_.image == vim_vol_attrs.volume_image_metadata['image_name']
+ else:
+ assert not hasattr(vim_vol_attrs, 'volume_image_metadata')
+
+ # Match volume name e.g 'device': u'/dev/vdf'
+ logger.info('Verifying [{}] in attached volumes {}'.format(vnfr_vol_.name, vim_vol_attrs.attachments))
+ assert [attachment for attachment in vim_vol_attrs.attachments if vnfr_vol_.name in attachment['device']]
+
+ @pytest.mark.skipif(not pytest.config.getoption("--l2-port-chaining"), reason="need --l2-port-chaining option to run")
+ def test_l2_port_chaining(self, proxy):
+ """
+ It uses existing NS, VNF packages: $RIFT_INSTALL/usr/rift/mano/nsds/vnffg_demo_nsd/vnffg_l2portchain_*.
+ This test function is specific to these packages. Those VNFs use Ubuntu trusty image ubuntu_trusty_1404.qcow2.
+ Asserts:
+ 1. Count of VNFFG in nsd and nsr
+ 2. Count of rsp, classifier in VNFFG descriptor and VNFFG record
+ 3. Need details what other fields need to be matched in nsd and nsr
+ 4. Traffic flows through internal hops as per the classifier and rsp
+ As per the classifiers in NS package, the following flows will be tested.
+ - Tcp packets with dest port 80 starting from pgw VNF should go through Firewall VNF.
+ - Udp packets with source port 80 starting from router VNF should go through nat->dpi
+ - Udp packets with dest port 80 starting from pgw VNF should go through dpi->nat
+
+ """
+ UDP_PROTOCOL, TCP_PROTOCOL = 17, 6
+
+ def pcap_analysis(pcap_file, src_add, dst_add, src_port=None, dst_port=None, protocol=6):
+ """Analyse packets in a pcap file and return True if there is a packet match w.r.t src_addr, dst_addr, protocol.
+ Args:
+ pcap_file: pcap file that is generated by traffic analysis utility such as tcpdump
+ src_add, dst_addr: Source & dest IP which need to be matched for a packet
+ protocol: Protocol that needs to be matched for a packet which already matched src_addr, dst_addr (protocol accepts integer e.g TCP 6, UDP 17)
+
+ Returns:
+ timestamp of the packet which is matched (Needed to check packet flow order through VNFs)
+ or
+ False: if there is no packet match
+
+ It uses scapy module to analyse pcap file. pip3 install scapy-python3
+ Other options https://pypi.python.org/pypi/pypcapfile
+ """
+ assert os.path.exists(pcap_file)
+ pkt_type = TCP if protocol==6 else UDP
+
+ pcap_obj = rdpcap(pcap_file)
+ for pkt in pcap_obj:
+ if IP in pkt:
+ if not(pkt[IP].src==src_add and pkt[IP].dst==dst_add and pkt[IP].proto==protocol):
+ continue
+ if pkt_type in pkt:
+ if src_port:
+ if not (pkt[pkt_type].sport==src_port):
+ continue
+ if dst_port:
+ if not (pkt[pkt_type].dport==dst_port):
+ continue
+ return pkt[IP].time
+ return False
+
+ # Check the VNFFG in nsd and nsr
+ for nsd, nsr in yield_nsd_nsr_pairs(proxy):
+ vnffgds = nsd.vnffgd
+ vnffgrs = nsr.vnffgr
+ assert len(vnffgds) == len(vnffgrs)
+
+ # Check the classifier, rsp in nsd and nsr
+ for vnffgd in vnffgds:
+ vnffgr = [vnffgr for vnffgr in vnffgrs if vnffgd.id == vnffgr.vnffgd_id_ref][0]
+ assert len(vnffgd.rsp) == len(vnffgr.rsp)
+ assert len(vnffgd.classifier) == len(vnffgr.classifier)
+
+ vnfrs = proxy(RwVnfrYang).get('/rw-project:project[rw-project:name="default"]/vnfr-catalog/vnfr', list_obj=True)
+
+ # Get the IP of VMs
+ vm_names = ('router', 'firewall', 'dpi', 'nat', 'pgw')
+ vm_ips = {vm_name: vnfr.vdur[0].vm_management_ip for vm_name in vm_names for vnfr in vnfrs.vnfr if
+ vm_name in vnfr.name}
+ vm_cp_ips = {vm_name: vnfr.connection_point[0].ip_address for vm_name in vm_names for vnfr in vnfrs.vnfr if
+ vm_name in vnfr.name}
+
+ # Establish Ssh sessions to the VMs
+ ssh_sessions = {}
+ for vm_name, vm_ip in vm_ips.items():
+ ssh_session = SshSession(vm_ip)
+ assert ssh_session
+ assert ssh_session.connect(username='ubuntu', password='ubuntu')
+ ssh_sessions[vm_name] = ssh_session
+
+ # Start python's SimpleHTTPServer on port 80 in the router VM
+ e_code, _, _ = ssh_sessions['router'].run_command('sudo python -m SimpleHTTPServer 80', max_wait=5)
+ assert e_code is None # Due to blocking call, it should timeout and return 'None' as exit code
+
+
+ # Check: Tcp packets with dest port 80 starting from pgw VNF should go through Firewall VNF.
+ pcap_file = 'l2test_firewall.pcap'
+ # Start tcpdump in firewall vnf and start sending tcp packets from pgw vnf
+ e_code, _, _ = ssh_sessions['firewall'].run_command(
+ 'sudo tcpdump -i eth1 -w {pcap} & sleep 10; sudo kill $!'.format(pcap=pcap_file), max_wait=4)
+ e_code, _, _ = ssh_sessions['pgw'].run_command('sudo nc {router_ip} 80 -w 0'.format(router_ip=vm_cp_ips['router']))
+
+ # Copy pcap file from firewall vnf for packet analysis
+ time.sleep(10)
+ assert ssh_sessions['firewall'].get(pcap_file, pcap_file)
+ assert pcap_analysis(pcap_file, vm_cp_ips['pgw'], vm_cp_ips['router'], dst_port=80, protocol=TCP_PROTOCOL)
+
+
+ # Check: Udp packets with source port 80 starting from router VNF should go through nat->dpi
+ pcap_nat = 'l2test_nat1.pcap'
+ pcap_dpi = 'l2test_dpi1.pcap'
+ # Start tcpdump in nat, dpi vnf and start sending udp packets from router vnf
+ e_code, _, _ = ssh_sessions['nat'].run_command(
+ 'sudo tcpdump -i eth1 -w {pcap} & sleep 15; sudo kill $!'.format(pcap=pcap_nat), max_wait=4)
+ e_code, _, _ = ssh_sessions['dpi'].run_command(
+ 'sudo tcpdump -i eth1 -w {pcap} & sleep 10; sudo kill $!'.format(pcap=pcap_dpi), max_wait=4)
+ e_code, _, _ = ssh_sessions['router'].run_command(
+ 'echo -n "hello" | sudo nc -4u {pgw_ip} 1000 -s {router_ip} -p 80 -w 0'.format(pgw_ip=vm_cp_ips['pgw'],
+ router_ip=vm_cp_ips[
+ 'router']))
+
+ # Copy pcap file from nat, dpi vnf for packet analysis
+ time.sleep(10)
+ assert ssh_sessions['nat'].get(pcap_nat, pcap_nat)
+ assert ssh_sessions['dpi'].get(pcap_dpi, pcap_dpi)
+ packet_ts_nat = pcap_analysis(pcap_nat, vm_cp_ips['router'], vm_cp_ips['pgw'], src_port=80, protocol=UDP_PROTOCOL)
+ packet_ts_dpi = pcap_analysis(pcap_dpi, vm_cp_ips['router'], vm_cp_ips['pgw'], src_port=80, protocol=UDP_PROTOCOL)
+ assert packet_ts_nat
+ assert packet_ts_dpi
+ assert packet_ts_nat < packet_ts_dpi # Packet flow must follow nat -> dpi
+
+
+ # Check: Udp packets with dest port 80 starting from pgw VNF should go through dpi->nat
+ pcap_nat = 'l2test_nat2.pcap'
+ pcap_dpi = 'l2test_dpi2.pcap'
+ # Start tcpdump in nat, dpi vnf and start sending udp packets from router vnf
+ e_code, _, _ = ssh_sessions['nat'].run_command(
+ 'sudo tcpdump -i eth1 -w {pcap} & sleep 15; sudo kill $!'.format(pcap=pcap_nat), max_wait=4)
+ e_code, _, _ = ssh_sessions['dpi'].run_command(
+ 'sudo tcpdump -i eth1 -w {pcap} & sleep 10; sudo kill $!'.format(pcap=pcap_dpi), max_wait=4)
+ e_code, _, _ = ssh_sessions['pgw'].run_command(
+ 'echo -n "hello" | sudo nc -4u {router_ip} 80 -w 0'.format(router_ip=vm_cp_ips['router']))
+
+ # Copy pcap file from nat, dpi vnf for packet analysis
+ time.sleep(10)
+ assert ssh_sessions['nat'].get(pcap_nat, pcap_nat)
+ assert ssh_sessions['dpi'].get(pcap_dpi, pcap_dpi)
+ packet_ts_nat = pcap_analysis(pcap_nat, vm_cp_ips['pgw'], vm_cp_ips['router'], dst_port=80, protocol=UDP_PROTOCOL)
+ packet_ts_dpi = pcap_analysis(pcap_dpi, vm_cp_ips['pgw'], vm_cp_ips['router'], dst_port=80, protocol=UDP_PROTOCOL)
+ assert packet_ts_nat
+ assert packet_ts_dpi
+ # The below assert used to fail while testing. ts_dpi is ahead of ts_nat in few microseconds
+ # Need to confirm if thats expected
+ assert packet_ts_dpi < packet_ts_nat # Packet flow must follow dpi -> nat
@pytest.mark.depends('nsr')
@pytest.mark.setup('nfvi')
@pytest.mark.incremental
class TestNfviMetrics(object):
+ @pytest.mark.skipif(True, reason='NFVI metrics are disabled - RIFT-15789')
def test_records_present(self, proxy):
assert_records(proxy)
@pytest.mark.depends('nfvi')
@pytest.mark.incremental
+@pytest.mark.skipif(pytest.config.getoption("--port-sequencing"), reason="Skip this for port-sequencing test")
class TestRecordsDescriptors:
- def test_create_update_vnfd(self, proxy, updated_ping_pong_records):
+ def test_create_update_vnfd(self, proxy, updated_ping_pong_descriptors):
"""
Verify VNFD related operations
Asserts:
If a VNFD record is created
"""
- ping_vnfd, pong_vnfd, _ = updated_ping_pong_records
- vnfdproxy = proxy(RwVnfdYang)
+ ping_vnfd, pong_vnfd, _ = updated_ping_pong_descriptors
+ vnfdproxy = proxy(RwProjectVnfdYang)
- for vnfd_record in [ping_vnfd, pong_vnfd]:
- xpath = "/vnfd-catalog/vnfd"
- vnfdproxy.create_config(xpath, vnfd_record.vnfd)
+ for vnfd in [ping_vnfd, pong_vnfd]:
+ xpath = "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd"
+ vnfdproxy.create_config(xpath, vnfd)
- xpath = "/vnfd-catalog/vnfd[id='{}']".format(vnfd_record.id)
- vnfd = vnfdproxy.get(xpath)
- assert vnfd.id == vnfd_record.id
+ xpath = "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd[id={}]".format(quoted_key(vnfd.id))
+ updated_vnfd = vnfdproxy.get(xpath)
+ assert updated_vnfd.id == vnfd.id
- vnfdproxy.replace_config(xpath, vnfd_record.vnfd)
+ vnfdproxy.replace_config(xpath, vnfd)
- def test_create_update_nsd(self, proxy, updated_ping_pong_records):
+ def test_create_update_nsd(self, proxy, updated_ping_pong_descriptors):
"""
Verify NSD related operations
Asserts:
If NSD record was created
"""
- _, _, ping_pong_nsd = updated_ping_pong_records
- nsdproxy = proxy(NsdYang)
+ _, _, ping_pong_nsd = updated_ping_pong_descriptors
+ nsdproxy = proxy(RwProjectNsdYang)
- xpath = "/nsd-catalog/nsd"
- nsdproxy.create_config(xpath, ping_pong_nsd.descriptor)
+ xpath = "/rw-project:project[rw-project:name='default']/nsd-catalog/nsd"
+ nsdproxy.create_config(xpath, ping_pong_nsd)
- xpath = "/nsd-catalog/nsd[id='{}']".format(ping_pong_nsd.id)
+ xpath = "/rw-project:project[rw-project:name='default']/nsd-catalog/nsd[id={}]".format(quoted_key(ping_pong_nsd.id))
nsd = nsdproxy.get(xpath)
assert nsd.id == ping_pong_nsd.id
- nsdproxy.replace_config(xpath, ping_pong_nsd.descriptor)
+ nsdproxy.replace_config(xpath, ping_pong_nsd)