4 # Copyright 2016-2017 RIFT.io Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
20 @author Varun Prasad (varun.prasad@riftio.com)
21 @brief Onboard descriptors
32 import requests_toolbelt
40 import rift
.auto
.session
41 import rift
.auto
.descriptor
43 gi
.require_version('RwNsrYang', '1.0')
44 gi
.require_version('RwProjectVnfdYang', '1.0')
45 gi
.require_version('RwLaunchpadYang', '1.0')
46 gi
.require_version('RwBaseYang', '1.0')
47 gi
.require_version('RwStagingMgmtYang', '1.0')
48 gi
.require_version('RwPkgMgmtYang', '1.0')
49 gi
.require_version('RwVlrYang', '1.0')
51 from gi
.repository
import (
68 gi
.require_version('RwKeyspec', '1.0')
69 from gi
.repository
.RwKeyspec
import quoted_key
71 logging
.basicConfig(level
=logging
.DEBUG
)
74 @pytest.fixture(scope
='module')
75 def vnfd_proxy(request
, mgmt_session
):
76 return mgmt_session
.proxy(RwProjectVnfdYang
)
78 @pytest.fixture(scope
='module')
79 def rwvnfr_proxy(request
, mgmt_session
):
80 return mgmt_session
.proxy(RwVnfrYang
)
82 @pytest.fixture(scope
='module')
83 def vld_proxy(request
, mgmt_session
):
84 return mgmt_session
.proxy(VldYang
)
87 @pytest.fixture(scope
='module')
88 def rwvlr_proxy(request
, mgmt_session
):
89 return mgmt_session
.proxy(RwVlrYang
)
92 @pytest.fixture(scope
='module')
93 def nsd_proxy(request
, mgmt_session
):
94 return mgmt_session
.proxy(RwProjectNsdYang
)
97 @pytest.fixture(scope
='module')
98 def rwnsr_proxy(request
, mgmt_session
):
99 return mgmt_session
.proxy(RwNsrYang
)
101 @pytest.fixture(scope
='module')
102 def base_proxy(request
, mgmt_session
):
103 return mgmt_session
.proxy(RwBaseYang
)
106 @pytest.fixture(scope
="module")
111 def upload_descriptor(
118 curl_cmd
= ('curl --cert {cert} --key {key} -F "descriptor=@{file}" -k '
119 '{scheme}://{host}:4567/api/{endpoint}'.format(
124 file=descriptor_file
,
128 logger
.debug("Uploading descriptor %s using cmd: %s", descriptor_file
, curl_cmd
)
129 stdout
= subprocess
.check_output(shlex
.split(curl_cmd
), universal_newlines
=True)
131 json_out
= json
.loads(stdout
)
132 transaction_id
= json_out
["transaction_id"]
134 return transaction_id
137 class DescriptorOnboardError(Exception):
141 def wait_onboard_transaction_finished(
150 logger
.info("Waiting for onboard trans_id %s to complete", transaction_id
)
151 uri
= '%s://%s:4567/api/%s/%s/state' % (scheme
, host
, endpoint
, transaction_id
)
155 while elapsed
< timeout
:
156 reply
= requests
.get(uri
, cert
=cert
, verify
=False)
158 if state
["status"] == "success":
160 if state
["status"] != "pending":
161 raise DescriptorOnboardError(state
)
164 elapsed
= time
.time() - start
167 if state
["status"] != "success":
168 raise DescriptorOnboardError(state
)
169 logger
.info("Descriptor onboard was successful")
172 def onboard_descriptor(host
, file_name
, logger
, endpoint
, scheme
, cert
):
173 """On-board/update the descriptor.
176 host (str): Launchpad IP
177 file_name (str): Full file path.
178 logger: Logger instance
179 endpoint (str): endpoint to be used for the upload operation.
182 logger
.info("Onboarding package: %s", file_name
)
183 trans_id
= upload_descriptor(
190 wait_onboard_transaction_finished(
199 def get_ns_cloud_resources(rwvnfr_proxy
, rwvlr_proxy
):
200 """Returns a collection of ports, networks, VMs used by this NS"""
201 ns_cloud_resources
= {'ports':[], 'vms':[], 'networks':[]}
203 # Get ports and VMs associated with each VNF
204 vnfrs
= rwvnfr_proxy
.get('/rw-project:project[rw-project:name="default"]/vnfr-catalog/vnfr', list_obj
=True)
205 for vnfr
in vnfrs
.vnfr
:
206 for cp
in vnfr
.connection_point
:
207 ns_cloud_resources
['ports'].append(cp
.connection_point_id
)
208 for vdur
in vnfr
.vdur
:
209 ns_cloud_resources
['vms'].append(vdur
.vim_id
)
211 # Get the network associated with each NS
212 vlrs
= rwvlr_proxy
.get('/rw-project:project[rw-project:name="default"]/vlr-catalog/vlr', list_obj
=True)
214 ns_cloud_resources
['networks'].append(vlr
.network_id
)
216 return ns_cloud_resources
219 @pytest.mark
.setup('nsr')
220 @pytest.mark
.depends('launchpad')
221 @pytest.mark
.incremental
222 class TestNsrStart(object):
223 """A brief overview of the steps performed.
224 1. Generate & on-board new descriptors
228 def test_upload_descriptors(
239 """Generates & On-boards the descriptors.
241 1. Request a staging area: RPC returns an endpoint and port
242 1. Upload the file to the endpoint, return the endpoint to download
243 2. Reconstruct the URL and trigger an RPC upload for the package.
245 # We are instantiating the NS twice in port-sequencing test. Seconds NS instantiation will be using already uploaded
246 # descriptors with updated interface positional values.
247 if iteration
==1 and pytest
.config
.getoption("--port-sequencing"):
251 for file_name
in descriptors
:
253 ip
= RwStagingMgmtYang
.YangInput_RwStagingMgmt_CreateStagingArea
.from_dict({
254 "package_type": "VNFD"})
256 if "nsd" in file_name
:
257 ip
.package_type
= "NSD"
259 data
= mgmt_session
.proxy(RwStagingMgmtYang
).rpc(ip
)
260 form
= requests_toolbelt
.MultipartEncoder(fields
={
261 'file': (os
.path
.basename(file_name
),
262 open(file_name
, 'rb'),
263 'application/octet-stream')
266 response
= requests
.post(
267 "{}://{}:{}/{}".format(
272 data
=form
.to_string(),
273 cert
=cert
, # cert is a tuple
275 headers
={"Content-Type": "multipart/form-data"})
277 resp
= json
.loads(response
.text
)
278 url
= "https://{}:{}{}".format(mgmt_session
.host
, data
.port
, resp
['path'])
280 ip
= RwPkgMgmtYang
.YangInput_RwPkgMgmt_PackageCreate
.from_dict({
281 "package_type": "VNFD",
285 if "nsd" in file_name
:
286 ip
.package_type
= "NSD"
288 # trigger the upload.
289 resp
= mgmt_session
.proxy(RwPkgMgmtYang
).rpc(ip
)
291 wait_onboard_transaction_finished(
296 host
=mgmt_session
.host
,
299 descriptor_vnfds
, descriptor_nsd
= descriptors
[:-1], descriptors
[-1]
301 catalog
= vnfd_proxy
.get_config('/rw-project:project[rw-project:name="default"]/vnfd-catalog')
302 actual_vnfds
= catalog
.vnfd
303 assert len(actual_vnfds
) == len(descriptor_vnfds
), \
304 "There should {} vnfds".format(len(descriptor_vnfds
))
306 catalog
= nsd_proxy
.get_config('/rw-project:project[rw-project:name="default"]/nsd-catalog')
307 actual_nsds
= catalog
.nsd
308 assert len(actual_nsds
) == 1, "There should only be a single nsd"
310 @pytest.mark
.skipif(not pytest
.config
.getoption('--upload-images-multiple-accounts'),
311 reason
="need --upload-images-multiple-accounts option to run")
312 def test_images_uploaded_multiple_accounts(self
, logger
, mgmt_session
, random_image_name
, cloud_accounts
, cal
):
313 image_mgmt_proxy
= mgmt_session
.proxy(RwImageMgmtYang
)
314 upload_jobs
= image_mgmt_proxy
.get('/rw-project:project[rw-project:name="default"]/upload-jobs')
315 logger
.info('Embedded image name(apart from ping pong Fedora images): {}'.format(random_image_name
))
316 for job
in upload_jobs
.job
:
317 assert image_mgmt_proxy
.wait_for('/rw-project:project[rw-project:name="default"]/upload-jobs/job[id={}]/status'.format(quoted_key(job
.id)), 'COMPLETED', timeout
=240)
318 assert len(job
.upload_tasks
) == len(cloud_accounts
)
319 for upload_task
in job
.upload_tasks
:
320 assert upload_task
.status
== 'COMPLETED'
322 assert len(upload_jobs
.job
) == 3
324 # Check whether images are present in VIMs
325 for account
in cloud_accounts
:
326 rc
, res
= cal
.get_image_list(RwcalYang
.YangData_RwProject_Project_CloudAccounts_CloudAccountList
.from_dict(account
.as_dict()))
327 assert rc
== RwTypes
.RwStatus
.SUCCESS
328 assert [image
for image
in res
.imageinfo_list
if image
.name
== random_image_name
]
330 @pytest.mark
.skipif(not pytest
.config
.getoption("--vnf-onboard-delete"), reason
="need --vnf-onboard-delete option to run")
331 def test_upload_delete_descriptors(self
, logger
, mgmt_session
, vnfd_proxy
, descriptors
, vnf_onboard_delete
):
332 """Randomly upload and delete VNFs. With each upload/delete, verify if the VNF
333 gets uploaded/deleted successfully.
335 xpath
= "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd[id={}]"
336 iteration
, vnf_count
= map(int, vnf_onboard_delete
.split(','))
338 # Get the VNF paths to be used for onboarding
339 all_vnfs
= [pkg_path
for pkg_path
in descriptors
if '_nsd' not in os
.path
.basename(pkg_path
)]
340 if vnf_count
> len(all_vnfs
):
341 vnf_count
= len(all_vnfs
)
342 available_vnfs
= random
.sample(all_vnfs
, vnf_count
)
344 # Get the add, delete iterations
345 add_del_seq
= list(np
.random
.choice(['add', 'del'], iteration
))
346 random
.shuffle(add_del_seq
)
347 logger
.info('Vnf add-delete iteration sequence: {}'.format(add_del_seq
))
352 """Returns list of VNFDs"""
353 vnfd_obj
= vnfd_proxy
.get("/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd", list_obj
=True)
354 return vnfd_obj
.vnfd
if vnfd_obj
else []
358 vnf_path
, vnfd_id
= random
.choice(list(uploaded_vnfs
.items()))
359 logger
.info('Deleting VNF {} having id {}'.format(os
.path
.basename(vnf_path
), vnfd_id
))
360 vnfd_proxy
.delete_config(xpath
.format(quoted_key(vnfd_id
)))
361 uploaded_vnfs
.pop(vnf_path
)
362 available_vnfs
.append(vnf_path
)
363 assert not [vnfd
for vnfd
in get_vnfd_list() if vnfd
.id == vnfd_id
]
365 for op_type
in add_del_seq
:
373 if not available_vnfs
:
376 vnf_path
= random
.choice(available_vnfs
)
377 logger
.info('Adding VNF {}'.format(os
.path
.basename(vnf_path
)))
378 rift
.auto
.descriptor
.onboard(mgmt_session
, vnf_path
)
379 vnfs
= get_vnfd_list()
380 assert len(vnfs
) == len(uploaded_vnfs
) + 1
381 vnfd
= [vnfd
for vnfd
in vnfs
if vnfd
.id not in list(uploaded_vnfs
.values())]
382 assert len(vnfd
) == 1
385 assert vnfd
.connection_point
387 uploaded_vnfs
[vnf_path
] = vnfd
.id
388 available_vnfs
.remove(vnf_path
)
390 assert len(get_vnfd_list()) == len(uploaded_vnfs
)
391 logger
.info('Onboarded VNFs : {}'.format(uploaded_vnfs
))
393 assert len(available_vnfs
) + len(uploaded_vnfs
) == vnf_count
394 # cleanup - Delete VNFs(if any)
395 for vnfd_id
in uploaded_vnfs
.values():
396 vnfd_proxy
.delete_config(xpath
.format(quoted_key(vnfd_id
)))
398 @pytest.mark
.feature("upload-image")
399 def test_upload_images(self
, descriptor_images
, cloud_host
, cloud_user
, cloud_tenants
):
401 openstack
= rift
.auto
.mano
.OpenstackManoSetup(
404 [(tenant
, "private") for tenant
in cloud_tenants
])
406 for image_location
in descriptor_images
:
407 image
= RwcalYang
.YangData_RwProject_Project_VimResources_ImageinfoList
.from_dict({
408 'name': os
.path
.basename(image_location
),
409 'location': image_location
,
410 'disk_format': 'qcow2',
411 'container_format': 'bare'})
412 openstack
.create_image(image
)
415 def test_set_scaling_params(self
, nsd_proxy
):
416 nsds
= nsd_proxy
.get('/rw-project:project[rw-project:name="default"]/nsd-catalog')
418 for scaling_group
in nsd
.scaling_group_descriptor
:
419 scaling_group
.max_instance_count
= 2
421 nsd_proxy
.replace_config('/rw-project:project[rw-project:name="default"]/nsd-catalog/nsd[id={}]'.format(
422 quoted_key(nsd
.id)), nsd
)
424 @pytest.mark
.skipif(not (pytest
.config
.getoption("--update-vnfd-instantiate") or pytest
.config
.getoption("--port-sequencing")),
425 reason
="need --update-vnfd-instantiate or --port-sequencing option to run")
426 def test_update_vnfd(self
, vnfd_proxy
, iteration
, port_sequencing_intf_positions
):
427 """Updates few fields of ping pong VNFDs and verify those changes
429 xpath
= "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd[id={}]"
430 vnfd_catalog
= "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd"
432 if iteration
==0 and pytest
.config
.getoption("--port-sequencing"):
436 vnfds
= vnfd_proxy
.get(vnfd_catalog
, list_obj
=True)
439 # Get ping pong VNFDs
440 for vnfd
in vnfds
.vnfd
:
441 if 'ping' in vnfd
.name
:
443 if 'pong' in vnfd
.name
:
447 vnfds_dict
= get_vnfd()
448 update_data
= {'ping':{'static_ip_address':'31.31.31.60'}, 'pong':{'static_ip_address':'31.31.31.90'}}
449 port_sequencing_intf_positions_tmp
= port_sequencing_intf_positions
[:]
451 # Modify/add fields in VNFDs
452 for name_
, vnfd
in vnfds_dict
.items():
453 if pytest
.config
.getoption('--update-vnfd-instantiate'):
454 vnfd
.vdu
[0].interface
[1].static_ip_address
= update_data
[name_
]['static_ip_address']
455 if pytest
.config
.getoption('--port-sequencing'):
456 vnfd_intf_list
= vnfd
.vdu
[0].interface
457 # for ping vnfd, remove positional values from all interfaces
458 # for pong vnfd, modify the positional values as per fixture port_sequencing_intf_positions
459 if 'ping' in vnfd
.name
:
461 for i
in range(len(vnfd_intf_list
)):
462 tmp_intf_dict
= vnfd_intf_list
[-1].as_dict()
463 del tmp_intf_dict
['position']
465 tmp_intf_list
.append(tmp_intf_dict
)
466 for intf_dict_without_positional_values
in tmp_intf_list
:
467 new_intf
= vnfd
.vdu
[0].interface
.add()
468 new_intf
.from_dict(intf_dict_without_positional_values
)
470 if 'pong' in vnfd
.name
:
471 for intf
in vnfd_intf_list
:
472 if 'position' in intf
:
473 intf
.position
= port_sequencing_intf_positions_tmp
.pop()
475 # Update/save the VNFDs
476 for vnfd
in vnfds_dict
.values():
477 vnfd_proxy
.replace_config(xpath
.format(quoted_key(vnfd
.id)), vnfd
)
479 # Match whether data is updated
480 vnfds_dict
= get_vnfd()
482 for name_
, vnfd
in vnfds_dict
.items():
483 if pytest
.config
.getoption('--update-vnfd-instantiate'):
484 assert vnfd
.vdu
[0].interface
[1].static_ip_address
== update_data
[name_
]['static_ip_address']
485 if pytest
.config
.getoption('--port-sequencing'):
486 if 'ping' in vnfd
.name
:
487 for intf
in vnfd
.vdu
[0].interface
:
488 assert 'position' not in intf
.as_dict()
489 if 'pong' in vnfd
.name
:
490 tmp_positional_values_list
= []
491 for intf
in vnfd
.vdu
[0].interface
:
492 if 'position' in intf
.as_dict():
493 tmp_positional_values_list
.append(intf
.position
)
494 assert set(tmp_positional_values_list
) == set(port_sequencing_intf_positions
)
496 def test_instantiate_nsr(self
, logger
, nsd_proxy
, rwnsr_proxy
, base_proxy
, cloud_account_name
):
498 def verify_input_parameters(running_config
, config_param
):
500 Verify the configured parameter set against the running configuration
502 for run_input_param
in running_config
.input_parameter
:
503 if (run_input_param
.xpath
== config_param
.xpath
and
504 run_input_param
.value
== config_param
.value
):
507 assert False, ("Verification of configured input parameters: { xpath:%s, value:%s} "
508 "is unsuccessful.\nRunning configuration: %s" % (config_param
.xpath
,
510 running_config
.input_parameter
))
512 catalog
= nsd_proxy
.get_config('/rw-project:project[rw-project:name="default"]/nsd-catalog')
515 input_parameters
= []
516 descr_xpath
= "/nsd:nsd-catalog/nsd:nsd/nsd:vendor"
517 descr_value
= "New Vendor"
518 in_param_id
= str(uuid
.uuid4())
520 input_param_1
= NsrYang
.YangData_RwProject_Project_NsInstanceConfig_Nsr_InputParameter(
524 input_parameters
.append(input_param_1
)
526 nsr
= rift
.auto
.descriptor
.create_nsr(cloud_account_name
, nsd
.name
, nsd
, input_param_list
=input_parameters
)
528 logger
.info("Instantiating the Network Service")
529 rwnsr_proxy
.create_config('/rw-project:project[rw-project:name="default"]/ns-instance-config/nsr', nsr
)
531 nsr_opdata
= rwnsr_proxy
.get('/rw-project:project[rw-project:name="default"]/ns-instance-opdata/nsr[ns-instance-config-ref={}]'.format(quoted_key(nsr
.id)))
532 assert nsr_opdata
is not None
534 # Verify the input parameter configuration
535 running_config
= rwnsr_proxy
.get_config("/rw-project:project[rw-project:name='default']/ns-instance-config/nsr[id=%s]" % quoted_key(nsr
.id))
536 for input_param
in input_parameters
:
537 verify_input_parameters(running_config
, input_param
)
539 def test_wait_for_nsr_started(self
, rwnsr_proxy
):
540 """Verify NSR instances enter 'running' operational-status
542 nsr_opdata
= rwnsr_proxy
.get('/rw-project:project[rw-project:name="default"]/ns-instance-opdata')
543 nsrs
= nsr_opdata
.nsr
546 xpath
= "/rw-project:project[rw-project:name='default']/ns-instance-opdata/nsr[ns-instance-config-ref={}]/operational-status".format(quoted_key(nsr
.ns_instance_config_ref
))
547 rwnsr_proxy
.wait_for(xpath
, "running", fail_on
=['failed'], timeout
=400)
549 def test_wait_for_nsr_configured(self
, rwnsr_proxy
):
550 """Verify NSR instances enter 'configured' config-status
552 nsr_opdata
= rwnsr_proxy
.get('/rw-project:project[rw-project:name="default"]/ns-instance-opdata')
553 nsrs
= nsr_opdata
.nsr
556 xpath
= "/rw-project:project[rw-project:name='default']/ns-instance-opdata/nsr[ns-instance-config-ref={}]/config-status".format(quoted_key(nsr
.ns_instance_config_ref
))
557 rwnsr_proxy
.wait_for(xpath
, "configured", fail_on
=['failed'], timeout
=400)
560 @pytest.mark
.teardown('nsr')
561 @pytest.mark
.depends('launchpad')
562 @pytest.mark
.incremental
563 class TestNsrTeardown(object):
565 def test_delete_embedded_images(self
, random_image_name
, cloud_accounts
, cal
):
566 """Deletes images embedded in VNF from VIM. It only deletes additional images, not
567 the Fedora ping pong images"""
568 for account
in cloud_accounts
:
569 rc
, rsp
= cal
.get_image_list(RwcalYang
.YangData_RwProject_Project_CloudAccounts_CloudAccountList
.from_dict(account
.as_dict()))
570 assert rc
== RwTypes
.RwStatus
.SUCCESS
572 for image
in rsp
.imageinfo_list
:
573 if random_image_name
in image
.name
:
574 cal
.delete_image(RwcalYang
.YangData_RwProject_Project_CloudAccounts_CloudAccountList
.from_dict(account
.as_dict()), image
.id)
576 def test_terminate_nsr(self
, rwvnfr_proxy
, rwnsr_proxy
, logger
, cloud_type
,
577 rwvlr_proxy
, vim_clients
, cloud_account_name
):
579 Terminate the instance and check if the record is deleted.
582 1. NSR record is deleted from instance-config.
585 # Collects the Cloud resources like ports, networks, VMs used by the current NS
586 ns_cloud_resources
= get_ns_cloud_resources(rwvnfr_proxy
, rwvlr_proxy
)
587 logger
.info('Cloud resources used by NS: {}'.format(ns_cloud_resources
))
589 logger
.debug("Terminating NSR")
590 wait_after_kill
= True
591 if cloud_type
== "mock":
592 wait_after_kill
= False
594 rift
.auto
.descriptor
.terminate_nsr(rwvnfr_proxy
, rwnsr_proxy
,
596 wait_after_kill
=wait_after_kill
)
597 # Collect all the ports, networks VMs from openstack and
598 # check if previously collected resources (i.e ns_cloud_resources) are still present in this collection
599 start_time
= time
.time()
600 while time
.time()-start_time
< 240:
602 vim_client
= vim_clients
[cloud_account_name
]
603 vim_resources
= dict()
604 vim_resources
['networks'] = vim_client
.neutron_network_list()
605 vim_resources
['vms'] = vim_client
.nova_server_list()
606 vim_resources
['ports'] = vim_client
.neutron_port_list()
608 for resource_type
in ns_cloud_resources
.keys():
609 logger
.debug("Verifying all %s resources have been removed from vim", resource_type
)
611 vim_resource
['id'] for vim_resource
in vim_resources
[resource_type
]
612 if 'shared' not in vim_resource
.keys()
613 or not vim_resource
['shared']
615 for ns_resource_id
in ns_cloud_resources
[resource_type
]:
616 logger
.debug('Verifying %s resource %s removed', resource_type
, ns_resource_id
)
617 assert ns_resource_id
not in vim_resource_ids
619 except AssertionError:
621 raise AssertionError("Resources not cleared from openstack")
623 def test_delete_records(self
, nsd_proxy
, vnfd_proxy
, iteration
):
624 """Delete the NSD & VNFD records
627 The records are deleted.
629 # We are instantiating the NS twice in port-sequencing test. Seconds NS instantiation will be using already uploaded
630 # descriptors with updated interface positional values.
631 if iteration
==0 and pytest
.config
.getoption("--port-sequencing"):
633 nsds
= nsd_proxy
.get("/rw-project:project[rw-project:name='default']/nsd-catalog/nsd", list_obj
=True)
635 xpath
= "/rw-project:project[rw-project:name='default']/nsd-catalog/nsd[id={}]".format(quoted_key(nsd
.id))
636 nsd_proxy
.delete_config(xpath
)
638 nsds
= nsd_proxy
.get("/rw-project:project[rw-project:name='default']/nsd-catalog/nsd", list_obj
=True)
639 assert nsds
is None or len(nsds
.nsd
) == 0
641 vnfds
= vnfd_proxy
.get("/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd", list_obj
=True)
642 for vnfd_record
in vnfds
.vnfd
:
643 xpath
= "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd[id={}]".format(quoted_key(vnfd_record
.id))
644 vnfd_proxy
.delete_config(xpath
)
646 vnfds
= vnfd_proxy
.get("/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd", list_obj
=True)
647 assert vnfds
is None or len(vnfds
.vnfd
) == 0