+ @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
+ def test_process_vdu_params_with_inst_flavor_id(
+ self,
+ mock_prepare_vdu_affinity_group_list,
+ mock_add_persistent_ordinary_disks_to_disk_list,
+ mock_add_persistent_root_disk_to_disk_list,
+ mock_find_persistent_volumes,
+ mock_find_persistent_root_volumes,
+ mock_prepare_vdu_ssh_keys,
+ mock_prepare_vdu_cloud_init,
+ mock_prepare_vdu_interfaces,
+ mock_locate_vdu_interfaces,
+ mock_sort_vdu_interfaces,
+ ):
+ """Instantiation volume list is empty."""
+ target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
+
+ target_vdu["interfaces"] = interfaces_wth_all_positions
+
+ vdu_instantiation_flavor_id = "flavor_test"
+
+ target_vdu["additionalParams"] = {
+ "OSM": {"vim_flavor_id": vdu_instantiation_flavor_id}
+ }
+ mock_prepare_vdu_cloud_init.return_value = {}
+ mock_prepare_vdu_affinity_group_list.return_value = []
+
+ new_kwargs = deepcopy(kwargs)
+ new_kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "tasks_by_target_record_id": {},
+ "logger": "logger",
+ }
+ )
+ expected_extra_dict_copy = deepcopy(expected_extra_dict3)
+ vnfd = deepcopy(vnfd_wth_persistent_storage)
+ db.get_one.return_value = vnfd
+ result = Ns._process_vdu_params(
+ target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
+ )
+ mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
+ mock_locate_vdu_interfaces.assert_not_called()
+ mock_prepare_vdu_cloud_init.assert_called_once()
+ mock_add_persistent_root_disk_to_disk_list.assert_called_once()
+ mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
+ mock_prepare_vdu_interfaces.assert_called_once_with(
+ target_vdu,
+ expected_extra_dict_copy,
+ ns_preffix,
+ vnf_preffix,
+ "logger",
+ {},
+ [],
+ )
+ self.assertDictEqual(result, expected_extra_dict_copy)
+ mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
+ mock_prepare_vdu_affinity_group_list.assert_called_once()
+ mock_find_persistent_volumes.assert_not_called()
+
+ @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
+ def test_process_vdu_params_wth_affinity_groups(
+ self,
+ mock_prepare_vdu_affinity_group_list,
+ mock_add_persistent_ordinary_disks_to_disk_list,
+ mock_add_persistent_root_disk_to_disk_list,
+ mock_find_persistent_volumes,
+ mock_find_persistent_root_volumes,
+ mock_prepare_vdu_ssh_keys,
+ mock_prepare_vdu_cloud_init,
+ mock_prepare_vdu_interfaces,
+ mock_locate_vdu_interfaces,
+ mock_sort_vdu_interfaces,
+ ):
+ """There is cloud-config."""
+ target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
+
+ self.maxDiff = None
+ target_vdu["interfaces"] = interfaces_wth_all_positions
+ mock_prepare_vdu_cloud_init.return_value = {}
+ mock_prepare_vdu_affinity_group_list.return_value = [
+ "affinity_group_1",
+ "affinity_group_2",
+ ]
+
+ new_kwargs = deepcopy(kwargs)
+ new_kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "tasks_by_target_record_id": {},
+ "logger": "logger",
+ }
+ )
+ expected_extra_dict3 = deepcopy(expected_extra_dict2)
+ expected_extra_dict3["params"]["affinity_group_list"] = [
+ "affinity_group_1",
+ "affinity_group_2",
+ ]
+ vnfd = deepcopy(vnfd_wth_persistent_storage)
+ db.get_one.return_value = vnfd
+ result = Ns._process_vdu_params(
+ target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
+ )
+ self.assertDictEqual(result, expected_extra_dict3)
+ mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
+ mock_locate_vdu_interfaces.assert_not_called()
+ mock_prepare_vdu_cloud_init.assert_called_once()
+ mock_add_persistent_root_disk_to_disk_list.assert_called_once()
+ mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
+ mock_prepare_vdu_interfaces.assert_called_once_with(
+ target_vdu,
+ expected_extra_dict3,
+ ns_preffix,
+ vnf_preffix,
+ "logger",
+ {},
+ [],
+ )
+
+ mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
+ mock_prepare_vdu_affinity_group_list.assert_called_once()
+ mock_find_persistent_volumes.assert_not_called()
+
+ @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
+ def test_process_vdu_params_wth_cloud_config(
+ self,
+ mock_prepare_vdu_affinity_group_list,
+ mock_add_persistent_ordinary_disks_to_disk_list,
+ mock_add_persistent_root_disk_to_disk_list,
+ mock_find_persistent_volumes,
+ mock_find_persistent_root_volumes,
+ mock_prepare_vdu_ssh_keys,
+ mock_prepare_vdu_cloud_init,
+ mock_prepare_vdu_interfaces,
+ mock_locate_vdu_interfaces,
+ mock_sort_vdu_interfaces,
+ ):
+ """There is cloud-config."""
+ target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
+
+ self.maxDiff = None
+ target_vdu["interfaces"] = interfaces_wth_all_positions
+ mock_prepare_vdu_cloud_init.return_value = {
+ "user-data": user_data,
+ "boot-data-drive": "vda",
+ }
+ mock_prepare_vdu_affinity_group_list.return_value = []
+
+ new_kwargs = deepcopy(kwargs)
+ new_kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "tasks_by_target_record_id": {},
+ "logger": "logger",
+ }
+ )
+ expected_extra_dict3 = deepcopy(expected_extra_dict2)
+ expected_extra_dict3["params"]["cloud_config"] = {
+ "user-data": user_data,
+ "boot-data-drive": "vda",
+ }
+ vnfd = deepcopy(vnfd_wth_persistent_storage)
+ db.get_one.return_value = vnfd
+ result = Ns._process_vdu_params(
+ target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
+ )
+ mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
+ mock_locate_vdu_interfaces.assert_not_called()
+ mock_prepare_vdu_cloud_init.assert_called_once()
+ mock_add_persistent_root_disk_to_disk_list.assert_called_once()
+ mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
+ mock_prepare_vdu_interfaces.assert_called_once_with(
+ target_vdu,
+ expected_extra_dict3,
+ ns_preffix,
+ vnf_preffix,
+ "logger",
+ {},
+ [],
+ )
+ self.assertDictEqual(result, expected_extra_dict3)
+ mock_prepare_vdu_ssh_keys.assert_called_once_with(
+ target_vdu, None, {"user-data": user_data, "boot-data-drive": "vda"}
+ )
+ mock_prepare_vdu_affinity_group_list.assert_called_once()
+ mock_find_persistent_volumes.assert_not_called()
+
+ @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
+ def test_process_vdu_params_wthout_persistent_storage(
+ self,
+ mock_prepare_vdu_affinity_group_list,
+ mock_add_persistent_ordinary_disks_to_disk_list,
+ mock_add_persistent_root_disk_to_disk_list,
+ mock_find_persistent_volumes,
+ mock_find_persistent_root_volumes,
+ mock_prepare_vdu_ssh_keys,
+ mock_prepare_vdu_cloud_init,
+ mock_prepare_vdu_interfaces,
+ mock_locate_vdu_interfaces,
+ mock_sort_vdu_interfaces,
+ ):
+ """There is not any persistent storage."""
+ target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
+
+ self.maxDiff = None
+ target_vdu["interfaces"] = interfaces_wth_all_positions
+ mock_prepare_vdu_cloud_init.return_value = {}
+ mock_prepare_vdu_affinity_group_list.return_value = []
+
+ new_kwargs = deepcopy(kwargs)
+ new_kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "tasks_by_target_record_id": {},
+ "logger": "logger",
+ }
+ )
+ expected_extra_dict_copy = deepcopy(expected_extra_dict2)
+ vnfd = deepcopy(vnfd_wthout_persistent_storage)
+ db.get_one.return_value = vnfd
+ result = Ns._process_vdu_params(
+ target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
+ )
+ mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
+ mock_locate_vdu_interfaces.assert_not_called()
+ mock_prepare_vdu_cloud_init.assert_called_once()
+ mock_add_persistent_root_disk_to_disk_list.assert_called_once()
+ mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
+ mock_prepare_vdu_interfaces.assert_called_once_with(
+ target_vdu,
+ expected_extra_dict_copy,
+ ns_preffix,
+ vnf_preffix,
+ "logger",
+ {},
+ [],
+ )
+ self.assertDictEqual(result, expected_extra_dict_copy)
+ mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
+ mock_prepare_vdu_affinity_group_list.assert_called_once()
+ mock_find_persistent_volumes.assert_not_called()
+
+ @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
+ def test_process_vdu_params_interfaces_partially_located(
+ self,
+ mock_prepare_vdu_affinity_group_list,
+ mock_add_persistent_ordinary_disks_to_disk_list,
+ mock_add_persistent_root_disk_to_disk_list,
+ mock_find_persistent_volumes,
+ mock_find_persistent_root_volumes,
+ mock_prepare_vdu_ssh_keys,
+ mock_prepare_vdu_cloud_init,
+ mock_prepare_vdu_interfaces,
+ mock_locate_vdu_interfaces,
+ mock_sort_vdu_interfaces,
+ ):
+ """Some interfaces have position."""
+ target_vdu = deepcopy(target_vdu_wth_persistent_storage)
+
+ self.maxDiff = None
+ target_vdu["interfaces"] = [
+ {
+ "name": "vdu-eth1",
+ "ns-vld-id": "net1",
+ },
+ {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
+ {
+ "name": "vdu-eth3",
+ "ns-vld-id": "mgmtnet",
+ },
+ ]
+ mock_prepare_vdu_cloud_init.return_value = {}
+ mock_prepare_vdu_affinity_group_list.return_value = []
+ persistent_root_disk = {
+ "persistent-root-volume": {
+ "image_id": "ubuntu20.04",
+ "size": "10",
+ "keep": True,
+ }
+ }
+ mock_find_persistent_root_volumes.return_value = persistent_root_disk
+
+ new_kwargs = deepcopy(kwargs)
+ new_kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "tasks_by_target_record_id": {},
+ "logger": "logger",
+ }
+ )
+
+ vnfd = deepcopy(vnfd_wth_persistent_storage)
+ db.get_one.return_value = vnfd
+ result = Ns._process_vdu_params(
+ target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
+ )
+ expected_extra_dict_copy = deepcopy(expected_extra_dict)
+ mock_sort_vdu_interfaces.assert_not_called()
+ mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
+ mock_prepare_vdu_cloud_init.assert_called_once()
+ mock_add_persistent_root_disk_to_disk_list.assert_called_once()
+ mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
+ mock_prepare_vdu_interfaces.assert_called_once_with(
+ target_vdu,
+ expected_extra_dict_copy,
+ ns_preffix,
+ vnf_preffix,
+ "logger",
+ {},
+ [],
+ )
+ self.assertDictEqual(result, expected_extra_dict_copy)
+ mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
+ mock_prepare_vdu_affinity_group_list.assert_called_once()
+ mock_find_persistent_volumes.assert_not_called()
+ mock_find_persistent_root_volumes.assert_not_called()
+
+ @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
+ def test_process_vdu_params_no_interface_position(
+ self,
+ mock_prepare_vdu_affinity_group_list,
+ mock_add_persistent_ordinary_disks_to_disk_list,
+ mock_add_persistent_root_disk_to_disk_list,
+ mock_find_persistent_volumes,
+ mock_find_persistent_root_volumes,
+ mock_prepare_vdu_ssh_keys,
+ mock_prepare_vdu_cloud_init,
+ mock_prepare_vdu_interfaces,
+ mock_locate_vdu_interfaces,
+ mock_sort_vdu_interfaces,
+ ):
+ """Interfaces do not have position."""
+ target_vdu = deepcopy(target_vdu_wth_persistent_storage)
+
+ self.maxDiff = None
+ target_vdu["interfaces"] = interfaces_wthout_positions
+ mock_prepare_vdu_cloud_init.return_value = {}
+ mock_prepare_vdu_affinity_group_list.return_value = []
+ persistent_root_disk = {
+ "persistent-root-volume": {
+ "image_id": "ubuntu20.04",
+ "size": "10",
+ "keep": True,
+ }
+ }
+ mock_find_persistent_root_volumes.return_value = persistent_root_disk
+ new_kwargs = deepcopy(kwargs)
+ new_kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "tasks_by_target_record_id": {},
+ "logger": "logger",
+ }
+ )
+
+ vnfd = deepcopy(vnfd_wth_persistent_storage)
+ db.get_one.return_value = vnfd
+ result = Ns._process_vdu_params(
+ target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
+ )
+ expected_extra_dict_copy = deepcopy(expected_extra_dict)
+ mock_sort_vdu_interfaces.assert_not_called()
+ mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
+ mock_prepare_vdu_cloud_init.assert_called_once()
+ mock_add_persistent_root_disk_to_disk_list.assert_called_once()
+ mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
+ mock_prepare_vdu_interfaces.assert_called_once_with(
+ target_vdu,
+ expected_extra_dict_copy,
+ ns_preffix,
+ vnf_preffix,
+ "logger",
+ {},
+ [],
+ )
+ self.assertDictEqual(result, expected_extra_dict_copy)
+ mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
+ mock_prepare_vdu_affinity_group_list.assert_called_once()
+ mock_find_persistent_volumes.assert_not_called()
+ mock_find_persistent_root_volumes.assert_not_called()
+
+ @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
+ def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
+ self,
+ mock_prepare_vdu_affinity_group_list,
+ mock_add_persistent_ordinary_disks_to_disk_list,
+ mock_add_persistent_root_disk_to_disk_list,
+ mock_find_persistent_volumes,
+ mock_find_persistent_root_volumes,
+ mock_prepare_vdu_ssh_keys,
+ mock_prepare_vdu_cloud_init,
+ mock_prepare_vdu_interfaces,
+ mock_locate_vdu_interfaces,
+ mock_sort_vdu_interfaces,
+ ):
+ """Prepare vdu interfaces method raises exception."""
+ target_vdu = deepcopy(target_vdu_wth_persistent_storage)
+
+ self.maxDiff = None
+ target_vdu["interfaces"] = interfaces_wthout_positions
+ mock_prepare_vdu_cloud_init.return_value = {}
+ mock_prepare_vdu_affinity_group_list.return_value = []
+ persistent_root_disk = {
+ "persistent-root-volume": {
+ "image_id": "ubuntu20.04",
+ "size": "10",
+ "keep": True,
+ }
+ }
+ mock_find_persistent_root_volumes.return_value = persistent_root_disk
+ new_kwargs = deepcopy(kwargs)
+ new_kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "tasks_by_target_record_id": {},
+ "logger": "logger",
+ }
+ )
+ mock_prepare_vdu_interfaces.side_effect = TypeError
+
+ vnfd = deepcopy(vnfd_wth_persistent_storage)
+ db.get_one.return_value = vnfd
+ with self.assertRaises(Exception) as err:
+ Ns._process_vdu_params(
+ target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
+ )
+ self.assertEqual(type(err), TypeError)
+ mock_sort_vdu_interfaces.assert_not_called()
+ mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
+ mock_prepare_vdu_cloud_init.assert_not_called()
+ mock_add_persistent_root_disk_to_disk_list.assert_not_called()
+ mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
+ mock_prepare_vdu_interfaces.assert_called_once()
+ mock_prepare_vdu_ssh_keys.assert_not_called()
+ mock_prepare_vdu_affinity_group_list.assert_not_called()
+ mock_find_persistent_volumes.assert_not_called()
+ mock_find_persistent_root_volumes.assert_not_called()
+
+ @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
+ @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
+ @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
+ def test_process_vdu_params_add_persistent_root_disk_raises_exception(
+ self,
+ mock_prepare_vdu_affinity_group_list,
+ mock_add_persistent_ordinary_disks_to_disk_list,
+ mock_add_persistent_root_disk_to_disk_list,
+ mock_find_persistent_volumes,
+ mock_find_persistent_root_volumes,
+ mock_prepare_vdu_ssh_keys,
+ mock_prepare_vdu_cloud_init,
+ mock_prepare_vdu_interfaces,
+ mock_locate_vdu_interfaces,
+ mock_sort_vdu_interfaces,
+ ):
+ """Add persistent root disk method raises exception."""
+ target_vdu = deepcopy(target_vdu_wth_persistent_storage)
+
+ self.maxDiff = None
+ target_vdu["interfaces"] = interfaces_wthout_positions
+ mock_prepare_vdu_cloud_init.return_value = {}
+ mock_prepare_vdu_affinity_group_list.return_value = []
+ mock_add_persistent_root_disk_to_disk_list.side_effect = KeyError
+ new_kwargs = deepcopy(kwargs)
+ new_kwargs.update(
+ {
+ "vnfr_id": vnfr_id,
+ "nsr_id": nsr_id,
+ "tasks_by_target_record_id": {},
+ "logger": "logger",
+ }
+ )
+
+ vnfd = deepcopy(vnfd_wth_persistent_storage)
+ db.get_one.return_value = vnfd
+ with self.assertRaises(Exception) as err:
+ Ns._process_vdu_params(
+ target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
+ )
+ self.assertEqual(type(err), KeyError)
+ mock_sort_vdu_interfaces.assert_not_called()
+ mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
+ mock_prepare_vdu_cloud_init.assert_called_once()
+ mock_add_persistent_root_disk_to_disk_list.assert_called_once()
+ mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
+ mock_prepare_vdu_interfaces.assert_called_once_with(
+ target_vdu,
+ {
+ "depends_on": [
+ f"{ns_preffix}:image.0",
+ f"{ns_preffix}:flavor.0",
+ ]
+ },
+ ns_preffix,
+ vnf_preffix,
+ "logger",
+ {},
+ [],
+ )
+
+ mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
+ mock_prepare_vdu_affinity_group_list.assert_not_called()
+ mock_find_persistent_volumes.assert_not_called()
+ mock_find_persistent_root_volumes.assert_not_called()
+
+ def test_select_persistent_root_disk(self):
+ vdu = deepcopy(target_vdu_wth_persistent_storage)
+ vdu["virtual-storage-desc"] = [
+ "persistent-root-volume",
+ "persistent-volume2",
+ "ephemeral-volume",
+ ]
+ vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
+ expected_result = vsd
+ result = Ns._select_persistent_root_disk(vsd, vdu)
+ self.assertEqual(result, expected_result)
+
+ def test_select_persistent_root_disk_first_vsd_is_different(self):
+ """VDU first virtual-storage-desc is different than vsd id."""
+ vdu = deepcopy(target_vdu_wth_persistent_storage)
+ vdu["virtual-storage-desc"] = [
+ "persistent-volume2",
+ "persistent-root-volume",
+ "ephemeral-volume",
+ ]
+ vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
+ expected_result = None
+ result = Ns._select_persistent_root_disk(vsd, vdu)
+ self.assertEqual(result, expected_result)
+
+ def test_select_persistent_root_disk_vsd_is_not_persistent(self):
+ """vsd type is not persistent."""
+ vdu = deepcopy(target_vdu_wth_persistent_storage)
+ vdu["virtual-storage-desc"] = [
+ "persistent-volume2",
+ "persistent-root-volume",
+ "ephemeral-volume",
+ ]
+ vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
+ vsd["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
+ expected_result = None
+ result = Ns._select_persistent_root_disk(vsd, vdu)
+ self.assertEqual(result, expected_result)
+
+ def test_select_persistent_root_disk_vsd_does_not_have_size(self):
+ """vsd size is None."""
+ vdu = deepcopy(target_vdu_wth_persistent_storage)
+ vdu["virtual-storage-desc"] = [
+ "persistent-volume2",
+ "persistent-root-volume",
+ "ephemeral-volume",
+ ]
+ vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
+ vsd["size-of-storage"] = None
+ expected_result = None
+ result = Ns._select_persistent_root_disk(vsd, vdu)
+ self.assertEqual(result, expected_result)
+
+ def test_select_persistent_root_disk_vdu_wthout_vsd(self):
+ """VDU does not have virtual-storage-desc."""
+ vdu = deepcopy(target_vdu_wth_persistent_storage)
+ vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
+ expected_result = None
+ result = Ns._select_persistent_root_disk(vsd, vdu)
+ self.assertEqual(result, expected_result)
+
+ def test_select_persistent_root_disk_invalid_vsd_type(self):
+ """vsd is list, expected to be a dict."""
+ vdu = deepcopy(target_vdu_wth_persistent_storage)
+ vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"]
+ with self.assertRaises(AttributeError):
+ Ns._select_persistent_root_disk(vsd, vdu)