+ @pytest.mark.skipif(not pytest.config.getoption('--upload-images-multiple-accounts'),
+ reason="need --upload-images-multiple-accounts option to run")
+ def test_images_uploaded_multiple_accounts(self, logger, mgmt_session, random_image_name, cloud_accounts, cal):
+ image_mgmt_proxy = mgmt_session.proxy(RwImageMgmtYang)
+ upload_jobs = image_mgmt_proxy.get('/rw-project:project[rw-project:name="default"]/upload-jobs')
+ logger.info('Embedded image name(apart from ping pong Fedora images): {}'.format(random_image_name))
+ for job in upload_jobs.job:
+ assert image_mgmt_proxy.wait_for('/rw-project:project[rw-project:name="default"]/upload-jobs/job[id={}]/status'.format(quoted_key(job.id)), 'COMPLETED', timeout=240)
+ assert len(job.upload_tasks) == len(cloud_accounts)
+ for upload_task in job.upload_tasks:
+ assert upload_task.status == 'COMPLETED'
+
+ assert len(upload_jobs.job) == 3
+
+ # Check whether images are present in VIMs
+ for account in cloud_accounts:
+ rc, res = cal.get_image_list(RwcalYang.YangData_RwProject_Project_CloudAccounts_CloudAccountList.from_dict(account.as_dict()))
+ assert rc == RwTypes.RwStatus.SUCCESS
+ assert [image for image in res.imageinfo_list if image.name == random_image_name]
+
+ @pytest.mark.skipif(not pytest.config.getoption("--vnf-onboard-delete"), reason="need --vnf-onboard-delete option to run")
+ def test_upload_delete_descriptors(self, logger, mgmt_session, vnfd_proxy, descriptors, vnf_onboard_delete):
+ """Randomly upload and delete VNFs. With each upload/delete, verify if the VNF
+ gets uploaded/deleted successfully.
+ """
+ xpath = "/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd[id={}]"
+ iteration, vnf_count = map(int, vnf_onboard_delete.split(','))
+
+ # Get the VNF paths to be used for onboarding
+ all_vnfs = [pkg_path for pkg_path in descriptors if '_nsd' not in os.path.basename(pkg_path)]
+ if vnf_count > len(all_vnfs):
+ vnf_count = len(all_vnfs)
+ available_vnfs = random.sample(all_vnfs, vnf_count)
+
+ # Get the add, delete iterations
+ add_del_seq = list(np.random.choice(['add', 'del'], iteration))
+ random.shuffle(add_del_seq)
+ logger.info('Vnf add-delete iteration sequence: {}'.format(add_del_seq))
+
+ uploaded_vnfs = {}
+
+ def get_vnfd_list():
+ """Returns list of VNFDs"""
+ vnfd_obj = vnfd_proxy.get("/rw-project:project[rw-project:name='default']/vnfd-catalog/vnfd", list_obj=True)
+ return vnfd_obj.vnfd if vnfd_obj else []
+
+ def delete_vnfd():
+ """Deletes a VNFD"""
+ vnf_path, vnfd_id = random.choice(list(uploaded_vnfs.items()))
+ logger.info('Deleting VNF {} having id {}'.format(os.path.basename(vnf_path), vnfd_id))
+ vnfd_proxy.delete_config(xpath.format(quoted_key(vnfd_id)))
+ uploaded_vnfs.pop(vnf_path)
+ available_vnfs.append(vnf_path)
+ assert not [vnfd for vnfd in get_vnfd_list() if vnfd.id == vnfd_id]
+
+ for op_type in add_del_seq:
+ if op_type =='del':
+ if uploaded_vnfs:
+ delete_vnfd()
+ continue
+ op_type = 'add'
+
+ if op_type == 'add':
+ if not available_vnfs:
+ delete_vnfd()
+ continue
+ vnf_path = random.choice(available_vnfs)
+ logger.info('Adding VNF {}'.format(os.path.basename(vnf_path)))
+ rift.auto.descriptor.onboard(mgmt_session, vnf_path)
+ vnfs = get_vnfd_list()
+ assert len(vnfs) == len(uploaded_vnfs) + 1
+ vnfd = [vnfd for vnfd in vnfs if vnfd.id not in list(uploaded_vnfs.values())]
+ assert len(vnfd) == 1
+ vnfd = vnfd[0]
+ assert vnfd.name
+ assert vnfd.connection_point
+ assert vnfd.vdu
+ uploaded_vnfs[vnf_path] = vnfd.id
+ available_vnfs.remove(vnf_path)
+
+ assert len(get_vnfd_list()) == len(uploaded_vnfs)
+ logger.info('Onboarded VNFs : {}'.format(uploaded_vnfs))
+
+ assert len(available_vnfs) + len(uploaded_vnfs) == vnf_count
+ # cleanup - Delete VNFs(if any)
+ for vnfd_id in uploaded_vnfs.values():
+ vnfd_proxy.delete_config(xpath.format(quoted_key(vnfd_id)))
+