@brief Launchpad System Test
"""
+import gi
import json
import logging
import os
import pytest
-import shlex
import requests
+import shlex
import shutil
import subprocess
import tempfile
import time
import uuid
+import rift.auto.descriptor
import rift.auto.mano
import rift.auto.session
import rift.mano.examples.ping_pong_nsd as ping_pong
-import gi
gi.require_version('RwNsrYang', '1.0')
-gi.require_version('RwVnfdYang', '1.0')
+gi.require_version('RwProjectNsdYang', '1.0')
+gi.require_version('RwProjectVnfdYang', '1.0')
gi.require_version('RwLaunchpadYang', '1.0')
gi.require_version('RwBaseYang', '1.0')
from gi.repository import (
- NsdYang,
+ RwProjectNsdYang,
RwNsrYang,
RwVnfrYang,
NsrYang,
VnfrYang,
VldYang,
- RwVnfdYang,
+ RwProjectVnfdYang,
RwLaunchpadYang,
RwBaseYang
)
+gi.require_version('RwKeyspec', '1.0')
+from gi.repository.RwKeyspec import quoted_key
+
logging.basicConfig(level=logging.DEBUG)
@pytest.fixture(scope='module')
def vnfd_proxy(request, mgmt_session):
- return mgmt_session.proxy(RwVnfdYang)
+ return mgmt_session.proxy(RwProjectVnfdYang)
@pytest.fixture(scope='module')
def rwvnfr_proxy(request, mgmt_session):
@pytest.fixture(scope='module')
def nsd_proxy(request, mgmt_session):
- return mgmt_session.proxy(NsdYang)
+ return mgmt_session.proxy(RwProjectNsdYang)
@pytest.fixture(scope='module')
def rwnsr_proxy(request, mgmt_session):
class DescriptorOnboardError(Exception):
pass
-def create_nsr(nsd, input_param_list, cloud_account_name):
- """
- Create the NSR record object
-
- Arguments:
- nsd - NSD
- input_param_list - list of input-parameter objects
- cloud_account_name - name of cloud account
-
- Return:
- NSR object
- """
- nsr = RwNsrYang.YangData_Nsr_NsInstanceConfig_Nsr()
-
- nsr.id = str(uuid.uuid4())
- nsr.name = rift.auto.mano.resource_name(nsr.id)
- nsr.short_name = "nsr_short_name"
- nsr.description = "This is a description"
- nsr.nsd.from_dict(nsr.as_dict())
- nsr.admin_status = "ENABLED"
- nsr.input_parameter.extend(input_param_list)
- nsr.cloud_account = cloud_account_name
-
- return nsr
def upload_descriptor(logger, descriptor_file, host="127.0.0.1"):
curl_cmd = 'curl --insecure -F "descriptor=@{file}" https://{host}:4567/api/upload'.format(
return transaction_id
-def wait_onboard_transaction_finished(logger, transaction_id, timeout=30, host="127.0.0.1"):
+def wait_onboard_transaction_finished(logger, transaction_id, timeout=30, host="127.0.0.1", project="default"):
def check_status_onboard_status():
- uri = 'https://%s:4567/api/upload/%s/state' % (host, transaction_id)
+ uri = 'https://%s:8008/api/operational/project/%s/create-jobs/job/%s' % (host, project, transaction_id)
curl_cmd = 'curl --insecure {uri}'.format(uri=uri)
return subprocess.check_output(shlex.split(curl_cmd), universal_newlines=True)
"""
logger.debug("Terminating Ping Pong NSRs")
- nsr_path = "/ns-instance-config"
+ nsr_path = "/rw-project:project[rw-project:name='default']/ns-instance-config"
nsr = rwnsr_proxy.get_config(nsr_path)
nsrs = nsr.nsr
xpaths = []
for ping_pong in nsrs:
- xpath = "/ns-instance-config/nsr[id='{}']".format(ping_pong.id)
+ xpath = "/rw-project:project[rw-project:name='default']/ns-instance-config/nsr[id={}]".format(quoted_key(ping_pong.id))
rwnsr_proxy.delete_config(xpath)
xpaths.append(xpath)
assert nsr is None
# Get the ns-instance-config
- ns_instance_config = rwnsr_proxy.get_config("/ns-instance-config")
+ ns_instance_config = rwnsr_proxy.get_config("/rw-project:project[rw-project:name='default']/ns-instance-config")
# Termination tests
- vnfr = "/vnfr-catalog/vnfr"
+ vnfr = "/rw-project:project[rw-project:name='default']/vnfr-catalog/vnfr"
vnfrs = rwvnfr_proxy.get(vnfr, list_obj=True)
assert vnfrs is None or len(vnfrs.vnfr) == 0
- # nsr = "/ns-instance-opdata/nsr"
+ # nsr = "/rw-project:project[rw-project:name='default']/ns-instance-opdata/nsr"
# nsrs = rwnsr_proxy.get(nsr, list_obj=True)
# assert len(nsrs.nsr) == 0
"""Generates & On-boards the descriptors.
"""
temp_dirs = []
- catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ catalog = vnfd_proxy.get_config('/rw-project:project[rw-project:name="default"]/vnfd-catalog')
endpoint = "upload"
"""
scheme,
cert)
- catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ catalog = vnfd_proxy.get_config('/rw-project:project[rw-project:name="default"]/vnfd-catalog')
vnfds = catalog.vnfd
assert len(vnfds) == 2, "There should two vnfds"
assert "ping_vnfd" in [vnfds[0].name, vnfds[1].name]
def delete_vnfds():
- vnfds = vnfd_proxy.get("/vnfd-catalog/vnfd", list_obj=True)
+ vnfds = vnfd_proxy.get("/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd", list_obj=True)
for vnfd_record in vnfds.vnfd:
- xpath = "/vnfd-catalog/vnfd[id='{}']".format(vnfd_record.id)
+ xpath = "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd[id={}]".format(quoted_key(vnfd_record.id))
vnfd_proxy.delete_config(xpath)
time.sleep(5)
- vnfds = vnfd_proxy.get("/vnfd-catalog/vnfd", list_obj=True)
+ vnfds = vnfd_proxy.get("/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd", list_obj=True)
assert vnfds is None or len(vnfds.vnfd) == 0
scheme,
cert)
- catalog = nsd_proxy.get_config('/nsd-catalog')
+ catalog = nsd_proxy.get_config('/rw-project:project[rw-project:name="default"]/nsd-catalog')
nsds = catalog.nsd
assert len(nsds) == 1, "There should only be a single nsd"
assert nsds[0].name == "ping_pong_nsd"
# for temp_dir in temp_dirs:
# temp_dir.cleanup()
- def test_instantiate_ping_pong_nsr(self, logger, nsd_proxy, rwnsr_proxy, base_proxy, cloud_account):
+ def test_instantiate_ping_pong_nsr(self, logger, nsd_proxy, rwnsr_proxy, base_proxy, cloud_account, use_accounts):
def verify_input_parameters(running_config, config_param):
"""
config_param.value,
running_config.input_parameter))
- catalog = nsd_proxy.get_config('/nsd-catalog')
+ catalog = nsd_proxy.get_config('/rw-project:project[rw-project:name="default"]/nsd-catalog')
nsd = catalog.nsd[0]
input_parameters = []
- descr_xpath = "/nsd:nsd-catalog/nsd:nsd[nsd:id='%s']/nsd:vendor" % nsd.id
+ descr_xpath = "/rw-project:project/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id=%s]/project-nsd:vendor" % quoted_key(nsd.id)
descr_value = "automation"
in_param_id = str(uuid.uuid4())
- input_param_1 = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_InputParameter(
+ input_param_1 = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_InputParameter(
xpath=descr_xpath,
value=descr_value)
input_parameters.append(input_param_1)
- nsr = create_nsr(nsd, input_parameters, cloud_account.name)
+ nsr_id = str(uuid.uuid4())
+ if use_accounts:
+ nsr = rift.auto.descriptor.create_nsr(
+ cloud_account.name,
+ nsr_id,
+ nsd,
+ input_param_list=input_parameters,
+ account=cloud_account.name,
+ nsr_id=nsr_id
+ )
+ else:
+ nsr = rift.auto.descriptor.create_nsr(
+ cloud_account.name,
+ nsr_id,
+ nsd,
+ input_param_list=input_parameters,
+ nsr_id=nsr_id
+ )
logger.info("Instantiating the Network Service")
- rwnsr_proxy.create_config('/ns-instance-config/nsr', nsr)
+ rwnsr_proxy.create_config('/rw-project:project[rw-project:name="default"]/ns-instance-config/nsr', nsr)
- nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata/nsr[ns-instance-config-ref="{}"]'.format(nsr.id))
+ nsr_opdata = rwnsr_proxy.get('/rw-project:project[rw-project:name="default"]/ns-instance-opdata/nsr[ns-instance-config-ref={}]'.format(quoted_key(nsr.id)))
assert nsr_opdata is not None
# Verify the input parameter configuration
- running_config = rwnsr_proxy.get_config("/ns-instance-config/nsr[id='%s']" % nsr.id)
+ running_config = rwnsr_proxy.get_config("/rw-project:project[rw-project:name='default']/ns-instance-config/nsr[id=%s]" % quoted_key(nsr.id))
for input_param in input_parameters:
verify_input_parameters(running_config, input_param)
def test_wait_for_pingpong_started(self, rwnsr_proxy):
- nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata')
+ nsr_opdata = rwnsr_proxy.get('/rw-project:project[rw-project:name="default"]/ns-instance-opdata')
nsrs = nsr_opdata.nsr
for nsr in nsrs:
- xpath = "/ns-instance-opdata/nsr[ns-instance-config-ref='{}']/operational-status".format(
- nsr.ns_instance_config_ref)
+ xpath = "/rw-project:project[rw-project:name='default']/ns-instance-opdata/nsr[ns-instance-config-ref={}]/operational-status".format(
+ quoted_key(nsr.ns_instance_config_ref))
rwnsr_proxy.wait_for(xpath, "running", fail_on=['failed'], timeout=180)
def test_wait_for_pingpong_configured(self, rwnsr_proxy):
- nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata')
+ nsr_opdata = rwnsr_proxy.get('/rw-project:project[rw-project:name="default"]/ns-instance-opdata')
nsrs = nsr_opdata.nsr
for nsr in nsrs:
- xpath = "/ns-instance-opdata/nsr[ns-instance-config-ref='{}']/config-status".format(
- nsr.ns_instance_config_ref)
+ xpath = "/rw-project:project[rw-project:name='default']/ns-instance-opdata/nsr[ns-instance-config-ref={}]/config-status".format(
+ quoted_key(nsr.ns_instance_config_ref))
rwnsr_proxy.wait_for(xpath, "configured", fail_on=['failed'], timeout=450)
"""Generates & On-boards the descriptors.
"""
temp_dirs = []
- catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ catalog = vnfd_proxy.get_config('/rw-project:project[rw-project:name="default"]/vnfd-catalog')
endpoint = "update"
ping_vnfd, pong_vnfd, ping_pong_nsd = ping_pong_records
scheme,
cert)
- catalog = vnfd_proxy.get_config('/vnfd-catalog')
+ catalog = vnfd_proxy.get_config('/rw-project:project[rw-project:name="default"]/vnfd-catalog')
vnfds = catalog.vnfd
assert len(vnfds) == 2, "There should two vnfds"
assert "pong_vnfd" in [vnfds[0].name, vnfds[1].name]
def delete_nsds():
- nsds = nsd_proxy.get("/nsd-catalog/nsd", list_obj=True)
+ nsds = nsd_proxy.get("/rw-project:project[rw-project:name='default']/nsd-catalog/nsd", list_obj=True)
for nsd_record in nsds.nsd:
- xpath = "/nsd-catalog/nsd[id='{}']".format(nsd_record.id)
+ xpath = "/rw-project:project[rw-project:name='default']/nsd-catalog/nsd[id={}]".format(quoted_key(nsd_record.id))
nsd_proxy.delete_config(xpath)
time.sleep(5)
- nsds = nsd_proxy.get("/nsd-catalog/nsd", list_obj=True)
+ nsds = nsd_proxy.get("/rw-project:project[rw-project:name='default']/nsd-catalog/nsd", list_obj=True)
assert nsds is None or len(nsds.nsd) == 0
delete_nsds()
def delete_vnfds():
- vnfds = vnfd_proxy.get("/vnfd-catalog/vnfd", list_obj=True)
+ vnfds = vnfd_proxy.get("/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd", list_obj=True)
for vnfd_record in vnfds.vnfd:
- xpath = "/vnfd-catalog/vnfd[id='{}']".format(vnfd_record.id)
+ xpath = "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd[id={}]".format(quoted_key(vnfd_record.id))
vnfd_proxy.delete_config(xpath)
time.sleep(5)
- vnfds = vnfd_proxy.get("/vnfd-catalog/vnfd", list_obj=True)
+ vnfds = vnfd_proxy.get("/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd", list_obj=True)
assert vnfds is None or len(vnfds.vnfd) == 0
delete_vnfds()
scheme,
cert)
- catalog = nsd_proxy.get_config('/nsd-catalog')
+ catalog = nsd_proxy.get_config('/rw-project:project[rw-project:name="default"]/nsd-catalog')
nsds = catalog.nsd
assert len(nsds) == 1, "There should only be a single nsd"
assert nsds[0].name == "ping_pong_nsd"
# for temp_dir in temp_dirs:
# temp_dir.cleanup()
- def test_instantiate_ping_pong_nsr(self, logger, nsd_proxy, rwnsr_proxy, base_proxy, cloud_account):
+ def test_instantiate_ping_pong_nsr(self, logger, nsd_proxy, rwnsr_proxy, base_proxy, cloud_account, use_accounts):
def verify_input_parameters(running_config, config_param):
"""
Verify the configured parameter set against the running configuration
config_param.value,
running_config.input_parameter))
- catalog = nsd_proxy.get_config('/nsd-catalog')
+ catalog = nsd_proxy.get_config('/rw-project:project[rw-project:name="default"]/nsd-catalog')
nsd = catalog.nsd[0]
input_parameters = []
- descr_xpath = "/nsd:nsd-catalog/nsd:nsd[nsd:id='%s']/nsd:vendor" % nsd.id
+ descr_xpath = "/rw-project:project/project-nsd:nsd-catalog/project-nsd:nsd[project-nsd:id=%s]/project-nsd:vendor" % quoted_key(nsd.id)
descr_value = "automation"
in_param_id = str(uuid.uuid4())
- input_param_1 = NsrYang.YangData_Nsr_NsInstanceConfig_Nsr_InputParameter(
+ input_param_1 = NsrYang.YangData_RwProject_Project_NsInstanceConfig_Nsr_InputParameter(
xpath=descr_xpath,
value=descr_value)
input_parameters.append(input_param_1)
- nsr = create_nsr(nsd, input_parameters, cloud_account.name)
+ nsr_id = str(uuid.uuid4())
+ if use_accounts:
+ nsr = rift.auto.descriptor.create_nsr(
+ cloud_account.name,
+ nsr_id,
+ nsd,
+ input_param_list=input_parameters,
+ account=cloud_account.name,
+ nsr_id=nsr_id
+ )
+ else:
+ nsr = rift.auto.descriptor.create_nsr(
+ cloud_account.name,
+ nsr_id,
+ nsd,
+ input_param_list=input_parameters,
+ nsr_id=nsr_id
+ )
logger.info("Instantiating the Network Service")
- rwnsr_proxy.create_config('/ns-instance-config/nsr', nsr)
+ rwnsr_proxy.create_config('/rw-project:project[rw-project:name="default"]/ns-instance-config/nsr', nsr)
- nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata/nsr[ns-instance-config-ref="{}"]'.format(nsr.id))
+ nsr_opdata = rwnsr_proxy.get('/rw-project:project[rw-project:name="default"]/ns-instance-opdata/nsr[ns-instance-config-ref={}]'.format(quoted_key(nsr.id)))
assert nsr_opdata is not None
# Verify the input parameter configuration
- running_config = rwnsr_proxy.get_config("/ns-instance-config/nsr[id='%s']" % nsr.id)
+ running_config = rwnsr_proxy.get_config("/rw-project:project[rw-project:name='default']/ns-instance-config/nsr[id=%s]" % quoted_key(nsr.id))
for input_param in input_parameters:
verify_input_parameters(running_config, input_param)
def test_wait_for_pingpong_started(self, rwnsr_proxy):
- nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata')
+ nsr_opdata = rwnsr_proxy.get('/rw-project:project[rw-project:name="default"]/ns-instance-opdata')
nsrs = nsr_opdata.nsr
for nsr in nsrs:
- xpath = "/ns-instance-opdata/nsr[ns-instance-config-ref='{}']/operational-status".format(
- nsr.ns_instance_config_ref)
+ xpath = "/rw-project:project[rw-project:name='default']/ns-instance-opdata/nsr[ns-instance-config-ref={}]/operational-status".format(
+ quoted_key(nsr.ns_instance_config_ref))
rwnsr_proxy.wait_for(xpath, "running", fail_on=['failed'], timeout=180)
def test_wait_for_pingpong_configured(self, rwnsr_proxy):
- nsr_opdata = rwnsr_proxy.get('/ns-instance-opdata')
+ nsr_opdata = rwnsr_proxy.get('/rw-project:project[rw-project:name="default"]/ns-instance-opdata')
nsrs = nsr_opdata.nsr
for nsr in nsrs:
- xpath = "/ns-instance-opdata/nsr[ns-instance-config-ref='{}']/config-status".format(
- nsr.ns_instance_config_ref)
+ xpath = "/rw-project:project[rw-project:name='default']/ns-instance-opdata/nsr[ns-instance-config-ref={}]/config-status".format(
+ quoted_key(nsr.ns_instance_config_ref))
rwnsr_proxy.wait_for(xpath, "configured", fail_on=['failed'], timeout=450)
Asserts:
The records are deleted.
"""
- nsds = nsd_proxy.get("/nsd-catalog/nsd", list_obj=True)
+ nsds = nsd_proxy.get("/rw-project:project[rw-project:name='default']/nsd-catalog/nsd", list_obj=True)
for nsd in nsds.nsd:
- xpath = "/nsd-catalog/nsd[id='{}']".format(nsd.id)
+ xpath = "/rw-project:project[rw-project:name='default']/nsd-catalog/nsd[id={}]".format(quoted_key(nsd.id))
nsd_proxy.delete_config(xpath)
- nsds = nsd_proxy.get("/nsd-catalog/nsd", list_obj=True)
+ nsds = nsd_proxy.get("/rw-project:project[rw-project:name='default']/nsd-catalog/nsd", list_obj=True)
assert nsds is None or len(nsds.nsd) == 0
- vnfds = vnfd_proxy.get("/vnfd-catalog/vnfd", list_obj=True)
+ vnfds = vnfd_proxy.get("/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd", list_obj=True)
for vnfd_record in vnfds.vnfd:
- xpath = "/vnfd-catalog/vnfd[id='{}']".format(vnfd_record.id)
+ xpath = "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd[id={}]".format(quoted_key(vnfd_record.id))
vnfd_proxy.delete_config(xpath)
- vnfds = vnfd_proxy.get("/vnfd-catalog/vnfd", list_obj=True)
+ vnfds = vnfd_proxy.get("/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd", list_obj=True)
assert vnfds is None or len(vnfds.vnfd) == 0