+ """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
+ interface = {
+ "name": "vdu-eth0",
+ "vcpi": "sample_vcpi",
+ "port_security": True,
+ "port_security_disable_strategy": "allow-address-pairs",
+ "floating_ip": "10.1.1.12",
+ "ns-vld-id": "mgmtnet",
+ "vnf-vld-id": "mgmt_cp_int",
+ "type": "PCI-PASSTHROUGH",
+ }
+ mock_deep_get.return_value = {}
+ tasks_by_target_record_id = {}
+ net_text = ns_preffix
+ net_item = {}
+ expected_net_item = {
+ "use": "data",
+ "model": "PCI-PASSTHROUGH",
+ "type": "PCI-PASSTHROUGH",
+ }
+ self.ns._prepare_type_of_interface(
+ interface, tasks_by_target_record_id, net_text, net_item
+ )
+ self.assertDictEqual(net_item, expected_net_item)
+ mock_deep_get.assert_called_once_with(
+ tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type"
+ )
+
+ @patch("osm_ng_ro.ns.deep_get")
+ def test_prepare_type_of_interface_type_mgmt(self, mock_deep_get):
+ """Interface type is mgmt."""
+ interface = {
+ "name": "vdu-eth0",
+ "vcpi": "sample_vcpi",
+ "port_security": True,
+ "port_security_disable_strategy": "allow-address-pairs",
+ "floating_ip": "10.1.1.12",
+ "ns-vld-id": "mgmtnet",
+ "vnf-vld-id": "mgmt_cp_int",
+ "type": "OM-MGMT",
+ }
+ tasks_by_target_record_id = {}
+ net_text = ns_preffix
+ net_item = {}
+ expected_net_item = {
+ "use": "mgmt",
+ }
+ self.ns._prepare_type_of_interface(
+ interface, tasks_by_target_record_id, net_text, net_item
+ )
+ self.assertDictEqual(net_item, expected_net_item)
+ mock_deep_get.assert_not_called()
+
+ @patch("osm_ng_ro.ns.deep_get")
+ def test_prepare_type_of_interface_type_bridge(self, mock_deep_get):
+ """Interface type is bridge."""
+ interface = {
+ "name": "vdu-eth0",
+ "vcpi": "sample_vcpi",
+ "port_security": True,
+ "port_security_disable_strategy": "allow-address-pairs",
+ "floating_ip": "10.1.1.12",
+ "ns-vld-id": "mgmtnet",
+ "vnf-vld-id": "mgmt_cp_int",
+ }
+ tasks_by_target_record_id = {}
+ net_text = ns_preffix
+ net_item = {}
+ expected_net_item = {
+ "use": "bridge",
+ "model": None,
+ }
+ self.ns._prepare_type_of_interface(
+ interface, tasks_by_target_record_id, net_text, net_item
+ )
+ self.assertDictEqual(net_item, expected_net_item)
+ mock_deep_get.assert_not_called()
+
+ @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
+ @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
+ @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
+ def test_prepare_vdu_interfaces(
+ self,
+ mock_type_of_interface,
+ mock_item_of_interface,
+ mock_port_security,
+ mock_vld_information_of_interface,
+ ):
+ """Prepare vdu interfaces successfully."""
+ target_vdu = deepcopy(target_vdu_wth_persistent_storage)
+ interface_1 = {
+ "name": "vdu-eth1",
+ "ns-vld-id": "net1",
+ "ip-address": "13.2.12.31",
+ "mgmt-interface": True,
+ }
+ interface_2 = {
+ "name": "vdu-eth2",
+ "vnf-vld-id": "net2",
+ "mac-address": "d0:94:66:ed:fc:e2",
+ }
+ interface_3 = {
+ "name": "vdu-eth3",
+ "ns-vld-id": "mgmtnet",
+ }
+ target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
+ extra_dict = {
+ "params": "test_params",
+ "find_params": "test_find_params",
+ "depends_on": [],
+ }
+
+ net_text_1 = f"{ns_preffix}:net1"
+ net_text_2 = f"{vnf_preffix}:net2"
+ net_text_3 = f"{ns_preffix}:mgmtnet"
+ net_item_1 = {
+ "name": "vdu-eth1",
+ "net_id": f"TASK-{ns_preffix}",
+ "type": "virtual",
+ }
+ net_item_2 = {
+ "name": "vdu-eth2",
+ "net_id": f"TASK-{ns_preffix}",
+ "type": "virtual",
+ }
+ net_item_3 = {
+ "name": "vdu-eth3",
+ "net_id": f"TASK-{ns_preffix}",
+ "type": "virtual",
+ }
+ mock_item_of_interface.side_effect = [net_item_1, net_item_2, net_item_3]
+ mock_vld_information_of_interface.side_effect = [
+ net_text_1,
+ net_text_2,
+ net_text_3,
+ ]
+ net_list = []
+ expected_extra_dict = {
+ "params": "test_params",
+ "find_params": "test_find_params",
+ "depends_on": [net_text_1, net_text_2, net_text_3],
+ "mgmt_vdu_interface": 0,
+ }
+ updated_net_item1 = deepcopy(net_item_1)
+ updated_net_item1.update({"ip_address": "13.2.12.31"})
+ updated_net_item2 = deepcopy(net_item_2)
+ updated_net_item2.update({"mac_address": "d0:94:66:ed:fc:e2"})
+ expected_net_list = [updated_net_item1, updated_net_item2, net_item_3]
+ self.ns._prepare_vdu_interfaces(
+ target_vdu,
+ extra_dict,
+ ns_preffix,
+ vnf_preffix,
+ self.logger,
+ tasks_by_target_record_id,
+ net_list,
+ )
+ _call_mock_vld_information_of_interface = (
+ mock_vld_information_of_interface.call_args_list
+ )
+ self.assertEqual(
+ _call_mock_vld_information_of_interface[0][0],
+ (interface_1, ns_preffix, vnf_preffix),
+ )
+ self.assertEqual(
+ _call_mock_vld_information_of_interface[1][0],
+ (interface_2, ns_preffix, vnf_preffix),
+ )
+ self.assertEqual(
+ _call_mock_vld_information_of_interface[2][0],
+ (interface_3, ns_preffix, vnf_preffix),
+ )
+
+ _call_mock_port_security = mock_port_security.call_args_list
+ self.assertEqual(_call_mock_port_security[0].args[0], interface_1)
+ self.assertEqual(_call_mock_port_security[1].args[0], interface_2)
+ self.assertEqual(_call_mock_port_security[2].args[0], interface_3)
+
+ _call_mock_item_of_interface = mock_item_of_interface.call_args_list
+ self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1))
+ self.assertEqual(_call_mock_item_of_interface[1][0], (interface_2, net_text_2))
+ self.assertEqual(_call_mock_item_of_interface[2][0], (interface_3, net_text_3))
+
+ _call_mock_type_of_interface = mock_type_of_interface.call_args_list
+ self.assertEqual(
+ _call_mock_type_of_interface[0][0],
+ (interface_1, tasks_by_target_record_id, net_text_1, net_item_1),
+ )
+ self.assertEqual(
+ _call_mock_type_of_interface[1][0],
+ (interface_2, tasks_by_target_record_id, net_text_2, net_item_2),
+ )
+ self.assertEqual(
+ _call_mock_type_of_interface[2][0],
+ (interface_3, tasks_by_target_record_id, net_text_3, net_item_3),
+ )
+ self.assertEqual(net_list, expected_net_list)
+ self.assertEqual(extra_dict, expected_extra_dict)
+ self.logger.error.assert_not_called()
+
+ @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
+ @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
+ @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
+ def test_prepare_vdu_interfaces_create_net_item_raise_exception(
+ self,
+ mock_type_of_interface,
+ mock_item_of_interface,
+ mock_port_security,
+ mock_vld_information_of_interface,
+ ):
+ """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
+ target_vdu = deepcopy(target_vdu_wth_persistent_storage)
+ interface_1 = {
+ "name": "vdu-eth1",
+ "ns-vld-id": "net1",
+ "ip-address": "13.2.12.31",
+ "mgmt-interface": True,
+ }
+ interface_2 = {
+ "name": "vdu-eth2",
+ "vnf-vld-id": "net2",
+ "mac-address": "d0:94:66:ed:fc:e2",
+ }
+ interface_3 = {
+ "name": "vdu-eth3",
+ "ns-vld-id": "mgmtnet",
+ }
+ target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
+ extra_dict = {
+ "params": "test_params",
+ "find_params": "test_find_params",
+ "depends_on": [],
+ }
+ net_text_1 = f"{ns_preffix}:net1"
+ mock_item_of_interface.side_effect = [TypeError, TypeError, TypeError]
+
+ mock_vld_information_of_interface.side_effect = [net_text_1]
+ net_list = []
+ expected_extra_dict = {
+ "params": "test_params",
+ "find_params": "test_find_params",
+ "depends_on": [net_text_1],
+ }
+ with self.assertRaises(TypeError):
+ self.ns._prepare_vdu_interfaces(
+ target_vdu,
+ extra_dict,
+ ns_preffix,
+ vnf_preffix,
+ self.logger,
+ tasks_by_target_record_id,
+ net_list,
+ )
+
+ _call_mock_vld_information_of_interface = (
+ mock_vld_information_of_interface.call_args_list
+ )
+ self.assertEqual(
+ _call_mock_vld_information_of_interface[0][0],
+ (interface_1, ns_preffix, vnf_preffix),
+ )
+
+ _call_mock_port_security = mock_port_security.call_args_list
+ self.assertEqual(_call_mock_port_security[0].args[0], interface_1)
+
+ _call_mock_item_of_interface = mock_item_of_interface.call_args_list
+ self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1))
+
+ mock_type_of_interface.assert_not_called()
+ self.logger.error.assert_not_called()
+ self.assertEqual(net_list, [])
+ self.assertEqual(extra_dict, expected_extra_dict)
+
+ @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
+ @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
+ @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
+ @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
+ def test_prepare_vdu_interfaces_vld_information_is_empty(
+ self,
+ mock_type_of_interface,
+ mock_item_of_interface,
+ mock_port_security,
+ mock_vld_information_of_interface,
+ ):
+ """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
+ target_vdu = deepcopy(target_vdu_wth_persistent_storage)
+ interface_1 = {
+ "name": "vdu-eth1",
+ "ns-vld-id": "net1",
+ "ip-address": "13.2.12.31",
+ "mgmt-interface": True,
+ }
+ interface_2 = {
+ "name": "vdu-eth2",
+ "vnf-vld-id": "net2",
+ "mac-address": "d0:94:66:ed:fc:e2",
+ }
+ interface_3 = {
+ "name": "vdu-eth3",
+ "ns-vld-id": "mgmtnet",
+ }
+ target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
+ extra_dict = {
+ "params": "test_params",
+ "find_params": "test_find_params",
+ "depends_on": [],
+ }
+ mock_vld_information_of_interface.side_effect = ["", "", ""]
+ net_list = []
+ self.ns._prepare_vdu_interfaces(
+ target_vdu,
+ extra_dict,
+ ns_preffix,
+ vnf_preffix,
+ self.logger,
+ tasks_by_target_record_id,
+ net_list,
+ )
+
+ _call_mock_vld_information_of_interface = (
+ mock_vld_information_of_interface.call_args_list
+ )
+ self.assertEqual(
+ _call_mock_vld_information_of_interface[0][0],
+ (interface_1, ns_preffix, vnf_preffix),
+ )
+ self.assertEqual(
+ _call_mock_vld_information_of_interface[1][0],
+ (interface_2, ns_preffix, vnf_preffix),
+ )
+ self.assertEqual(
+ _call_mock_vld_information_of_interface[2][0],
+ (interface_3, ns_preffix, vnf_preffix),
+ )
+
+ _call_logger = self.logger.error.call_args_list
+ self.assertEqual(
+ _call_logger[0][0],
+ ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
+ )
+ self.assertEqual(
+ _call_logger[1][0],
+ ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
+ )
+ self.assertEqual(
+ _call_logger[2][0],
+ ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
+ )
+ self.assertEqual(net_list, [])
+ self.assertEqual(
+ extra_dict,
+ {
+ "params": "test_params",
+ "find_params": "test_find_params",
+ "depends_on": [],