+
+ def test__ip_profile_to_ro_with_none(self):
+ ip_profile = None
+
+ result = Ns._ip_profile_to_ro(
+ ip_profile=ip_profile,
+ )
+
+ self.assertIsNone(result)
+
+ def test__ip_profile_to_ro_with_empty_profile(self):
+ ip_profile = {}
+
+ result = Ns._ip_profile_to_ro(
+ ip_profile=ip_profile,
+ )
+
+ self.assertIsNone(result)
+
+ def test__ip_profile_to_ro_with_wrong_profile(self):
+ ip_profile = {
+ "no-profile": "here",
+ }
+ expected_result = {
+ "ip_version": "IPv4",
+ "subnet_address": None,
+ "gateway_address": None,
+ "dhcp_enabled": False,
+ "dhcp_start_address": None,
+ "dhcp_count": None,
+ }
+
+ result = Ns._ip_profile_to_ro(
+ ip_profile=ip_profile,
+ )
+
+ self.assertDictEqual(expected_result, result)
+
+ def test__ip_profile_to_ro_with_ipv4_profile(self):
+ ip_profile = {
+ "ip-version": "ipv4",
+ "subnet-address": "192.168.0.0/24",
+ "gateway-address": "192.168.0.254",
+ "dhcp-params": {
+ "enabled": True,
+ "start-address": "192.168.0.10",
+ "count": 25,
+ },
+ }
+ expected_result = {
+ "ip_version": "IPv4",
+ "subnet_address": "192.168.0.0/24",
+ "gateway_address": "192.168.0.254",
+ "dhcp_enabled": True,
+ "dhcp_start_address": "192.168.0.10",
+ "dhcp_count": 25,
+ }
+
+ result = Ns._ip_profile_to_ro(
+ ip_profile=ip_profile,
+ )
+
+ self.assertDictEqual(expected_result, result)
+
+ def test__ip_profile_to_ro_with_ipv6_profile(self):
+ ip_profile = {
+ "ip-version": "ipv6",
+ "subnet-address": "2001:0200:0001::/48",
+ "gateway-address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
+ "dhcp-params": {
+ "enabled": True,
+ "start-address": "2001:0200:0001::0010",
+ "count": 25,
+ },
+ }
+ expected_result = {
+ "ip_version": "IPv6",
+ "subnet_address": "2001:0200:0001::/48",
+ "gateway_address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
+ "dhcp_enabled": True,
+ "dhcp_start_address": "2001:0200:0001::0010",
+ "dhcp_count": 25,
+ }
+
+ result = Ns._ip_profile_to_ro(
+ ip_profile=ip_profile,
+ )
+
+ self.assertDictEqual(expected_result, result)
+
+ def test__ip_profile_to_ro_with_dns_server(self):
+ ip_profile = {
+ "ip-version": "ipv4",
+ "subnet-address": "192.168.0.0/24",
+ "gateway-address": "192.168.0.254",
+ "dhcp-params": {
+ "enabled": True,
+ "start-address": "192.168.0.10",
+ "count": 25,
+ },
+ "dns-server": [
+ {
+ "address": "8.8.8.8",
+ },
+ {
+ "address": "1.1.1.1",
+ },
+ {
+ "address": "1.0.0.1",
+ },
+ ],
+ }
+ expected_result = {
+ "ip_version": "IPv4",
+ "subnet_address": "192.168.0.0/24",
+ "gateway_address": "192.168.0.254",
+ "dhcp_enabled": True,
+ "dhcp_start_address": "192.168.0.10",
+ "dhcp_count": 25,
+ "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
+ }
+
+ result = Ns._ip_profile_to_ro(
+ ip_profile=ip_profile,
+ )
+
+ self.assertDictEqual(expected_result, result)
+
+ def test__ip_profile_to_ro_with_security_group(self):
+ ip_profile = {
+ "ip-version": "ipv4",
+ "subnet-address": "192.168.0.0/24",
+ "gateway-address": "192.168.0.254",
+ "dhcp-params": {
+ "enabled": True,
+ "start-address": "192.168.0.10",
+ "count": 25,
+ },
+ "security-group": {
+ "some-security-group": "here",
+ },
+ }
+ expected_result = {
+ "ip_version": "IPv4",
+ "subnet_address": "192.168.0.0/24",
+ "gateway_address": "192.168.0.254",
+ "dhcp_enabled": True,
+ "dhcp_start_address": "192.168.0.10",
+ "dhcp_count": 25,
+ "security_group": {
+ "some-security-group": "here",
+ },
+ }
+
+ result = Ns._ip_profile_to_ro(
+ ip_profile=ip_profile,
+ )
+
+ self.assertDictEqual(expected_result, result)
+
+ def test__ip_profile_to_ro(self):
+ ip_profile = {
+ "ip-version": "ipv4",
+ "subnet-address": "192.168.0.0/24",
+ "gateway-address": "192.168.0.254",
+ "dhcp-params": {
+ "enabled": True,
+ "start-address": "192.168.0.10",
+ "count": 25,
+ },
+ "dns-server": [
+ {
+ "address": "8.8.8.8",
+ },
+ {
+ "address": "1.1.1.1",
+ },
+ {
+ "address": "1.0.0.1",
+ },
+ ],
+ "security-group": {
+ "some-security-group": "here",
+ },
+ }
+ expected_result = {
+ "ip_version": "IPv4",
+ "subnet_address": "192.168.0.0/24",
+ "gateway_address": "192.168.0.254",
+ "dhcp_enabled": True,
+ "dhcp_start_address": "192.168.0.10",
+ "dhcp_count": 25,
+ "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
+ "security_group": {
+ "some-security-group": "here",
+ },
+ }
+
+ result = Ns._ip_profile_to_ro(
+ ip_profile=ip_profile,
+ )
+
+ self.assertDictEqual(expected_result, result)
+
+ @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
+ def test__process_net_params_with_empty_params(
+ self,
+ ip_profile_to_ro,
+ ):
+ target_vld = {
+ "name": "vld-name",
+ }
+ indata = {
+ "name": "ns-name",
+ }
+ vim_info = {
+ "provider_network": "some-profile-here",
+ }
+ target_record_id = ""
+ expected_result = {
+ "params": {
+ "net_name": "ns-name-vld-name",
+ "net_type": "bridge",
+ "ip_profile": {
+ "some_ip_profile": "here",
+ },
+ "provider_network_profile": "some-profile-here",
+ }
+ }
+
+ ip_profile_to_ro.return_value = {
+ "some_ip_profile": "here",
+ }
+
+ result = Ns._process_net_params(
+ target_vld=target_vld,
+ indata=indata,
+ vim_info=vim_info,
+ target_record_id=target_record_id,
+ )
+
+ self.assertDictEqual(expected_result, result)
+ self.assertTrue(ip_profile_to_ro.called)
+
+ @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
+ def test__process_net_params_with_vim_info_sdn(
+ self,
+ ip_profile_to_ro,
+ ):
+ target_vld = {
+ "name": "vld-name",
+ }
+ indata = {
+ "name": "ns-name",
+ }
+ vim_info = {
+ "sdn": "some-sdn",
+ "sdn-ports": ["some", "ports", "here"],
+ "vlds": ["some", "vlds", "here"],
+ "type": "sdn-type",
+ }
+ target_record_id = "vld.sdn.something"
+ expected_result = {
+ "params": {
+ "sdn-ports": ["some", "ports", "here"],
+ "vlds": ["some", "vlds", "here"],
+ "type": "sdn-type",
+ }
+ }
+
+ result = Ns._process_net_params(
+ target_vld=target_vld,
+ indata=indata,
+ vim_info=vim_info,
+ target_record_id=target_record_id,
+ )
+
+ self.assertDictEqual(expected_result, result)
+ self.assertFalse(ip_profile_to_ro.called)
+
+ @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
+ def test__process_net_params_with_vim_info_sdn_target_vim(
+ self,
+ ip_profile_to_ro,
+ ):
+ target_vld = {
+ "name": "vld-name",
+ }
+ indata = {
+ "name": "ns-name",
+ }
+ vim_info = {
+ "sdn": "some-sdn",
+ "sdn-ports": ["some", "ports", "here"],
+ "vlds": ["some", "vlds", "here"],
+ "target_vim": "some-vim",
+ "type": "sdn-type",
+ }
+ target_record_id = "vld.sdn.something"
+ expected_result = {
+ "depends_on": ["some-vim vld.sdn"],
+ "params": {
+ "sdn-ports": ["some", "ports", "here"],
+ "vlds": ["some", "vlds", "here"],
+ "target_vim": "some-vim",
+ "type": "sdn-type",
+ },
+ }
+
+ result = Ns._process_net_params(
+ target_vld=target_vld,
+ indata=indata,
+ vim_info=vim_info,
+ target_record_id=target_record_id,
+ )
+
+ self.assertDictEqual(expected_result, result)
+ self.assertFalse(ip_profile_to_ro.called)
+
+ @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
+ def test__process_net_params_with_vim_network_name(
+ self,
+ ip_profile_to_ro,
+ ):
+ target_vld = {
+ "name": "vld-name",
+ }
+ indata = {
+ "name": "ns-name",
+ }
+ vim_info = {
+ "vim_network_name": "some-network-name",
+ }
+ target_record_id = "vld.sdn.something"
+ expected_result = {
+ "find_params": {
+ "filter_dict": {
+ "name": "some-network-name",
+ },
+ },
+ }
+
+ result = Ns._process_net_params(
+ target_vld=target_vld,
+ indata=indata,
+ vim_info=vim_info,
+ target_record_id=target_record_id,
+ )
+
+ self.assertDictEqual(expected_result, result)
+ self.assertFalse(ip_profile_to_ro.called)
+
+ @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
+ def test__process_net_params_with_vim_network_id(
+ self,
+ ip_profile_to_ro,
+ ):
+ target_vld = {
+ "name": "vld-name",
+ }
+ indata = {
+ "name": "ns-name",
+ }
+ vim_info = {
+ "vim_network_id": "some-network-id",
+ }
+ target_record_id = "vld.sdn.something"
+ expected_result = {
+ "find_params": {
+ "filter_dict": {
+ "id": "some-network-id",
+ },
+ },
+ }
+
+ result = Ns._process_net_params(
+ target_vld=target_vld,
+ indata=indata,
+ vim_info=vim_info,
+ target_record_id=target_record_id,
+ )
+
+ self.assertDictEqual(expected_result, result)
+ self.assertFalse(ip_profile_to_ro.called)
+
+ @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
+ def test__process_net_params_with_mgmt_network(
+ self,
+ ip_profile_to_ro,
+ ):
+ target_vld = {
+ "id": "vld-id",
+ "name": "vld-name",
+ "mgmt-network": "some-mgmt-network",
+ }
+ indata = {
+ "name": "ns-name",
+ }
+ vim_info = {}
+ target_record_id = "vld.sdn.something"
+ expected_result = {
+ "find_params": {
+ "mgmt": True,
+ "name": "vld-id",
+ },
+ }
+
+ result = Ns._process_net_params(
+ target_vld=target_vld,
+ indata=indata,
+ vim_info=vim_info,
+ target_record_id=target_record_id,
+ )
+
+ self.assertDictEqual(expected_result, result)
+ self.assertFalse(ip_profile_to_ro.called)
+
+ @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
+ def test__process_net_params_with_underlay_eline(
+ self,
+ ip_profile_to_ro,
+ ):
+ target_vld = {
+ "name": "vld-name",
+ "underlay": "some-underlay-here",
+ "type": "ELINE",
+ }
+ indata = {
+ "name": "ns-name",
+ }
+ vim_info = {
+ "provider_network": "some-profile-here",
+ }
+ target_record_id = ""
+ expected_result = {
+ "params": {
+ "ip_profile": {
+ "some_ip_profile": "here",
+ },
+ "net_name": "ns-name-vld-name",
+ "net_type": "ptp",
+ "provider_network_profile": "some-profile-here",
+ }
+ }
+
+ ip_profile_to_ro.return_value = {
+ "some_ip_profile": "here",
+ }
+
+ result = Ns._process_net_params(
+ target_vld=target_vld,
+ indata=indata,
+ vim_info=vim_info,
+ target_record_id=target_record_id,
+ )
+
+ self.assertDictEqual(expected_result, result)
+ self.assertTrue(ip_profile_to_ro.called)
+
+ @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
+ def test__process_net_params_with_underlay_elan(
+ self,
+ ip_profile_to_ro,
+ ):
+ target_vld = {
+ "name": "vld-name",
+ "underlay": "some-underlay-here",
+ "type": "ELAN",
+ }
+ indata = {
+ "name": "ns-name",
+ }
+ vim_info = {
+ "provider_network": "some-profile-here",
+ }
+ target_record_id = ""
+ expected_result = {
+ "params": {
+ "ip_profile": {
+ "some_ip_profile": "here",
+ },
+ "net_name": "ns-name-vld-name",
+ "net_type": "data",
+ "provider_network_profile": "some-profile-here",
+ }
+ }
+
+ ip_profile_to_ro.return_value = {
+ "some_ip_profile": "here",
+ }
+
+ result = Ns._process_net_params(
+ target_vld=target_vld,
+ indata=indata,
+ vim_info=vim_info,
+ target_record_id=target_record_id,
+ )
+
+ self.assertDictEqual(expected_result, result)
+ self.assertTrue(ip_profile_to_ro.called)
+
+ def test__get_cloud_init_exception(self):
+ db_mock = MagicMock(name="database mock")
+ fs_mock = None
+
+ location = ""
+
+ with self.assertRaises(NsException):
+ Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+ def test__get_cloud_init_file_fs_exception(self):
+ db_mock = MagicMock(name="database mock")
+ fs_mock = None
+
+ location = "vnfr_id_123456:file:test_file"
+ db_mock.get_one.return_value = {
+ "_admin": {
+ "storage": {
+ "folder": "/home/osm",
+ "pkg-dir": "vnfr_test_dir",
+ },
+ },
+ }
+
+ with self.assertRaises(NsException):
+ Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+ def test__get_cloud_init_file(self):
+ db_mock = MagicMock(name="database mock")
+ fs_mock = MagicMock(name="filesystem mock")
+ file_mock = MagicMock(name="file mock")
+
+ location = "vnfr_id_123456:file:test_file"
+ cloud_init_content = "this is a cloud init file content"
+
+ db_mock.get_one.return_value = {
+ "_admin": {
+ "storage": {
+ "folder": "/home/osm",
+ "pkg-dir": "vnfr_test_dir",
+ },
+ },
+ }
+ fs_mock.file_open.return_value = file_mock
+ file_mock.__enter__.return_value.read.return_value = cloud_init_content
+
+ result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+ self.assertEqual(cloud_init_content, result)
+
+ def test__get_cloud_init_vdu(self):
+ db_mock = MagicMock(name="database mock")
+ fs_mock = None
+
+ location = "vnfr_id_123456:vdu:0"
+ cloud_init_content = "this is a cloud init file content"
+
+ db_mock.get_one.return_value = {
+ "vdu": {
+ 0: {
+ "cloud-init": cloud_init_content,
+ },
+ },
+ }
+
+ result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
+
+ self.assertEqual(cloud_init_content, result)
+
+ @patch("jinja2.Environment.__init__")
+ def test__parse_jinja2_undefined_error(self, env_mock: Mock):
+ cloud_init_content = None
+ params = None
+ context = None
+
+ env_mock.side_effect = UndefinedError("UndefinedError occurred.")
+
+ with self.assertRaises(NsException):
+ Ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+
+ @patch("jinja2.Environment.__init__")
+ def test__parse_jinja2_template_error(self, env_mock: Mock):
+ cloud_init_content = None
+ params = None
+ context = None
+
+ env_mock.side_effect = TemplateError("TemplateError occurred.")
+
+ with self.assertRaises(NsException):
+ Ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+
+ @patch("jinja2.Environment.__init__")
+ def test__parse_jinja2_template_not_found(self, env_mock: Mock):
+ cloud_init_content = None
+ params = None
+ context = None
+
+ env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.")
+
+ with self.assertRaises(NsException):
+ Ns._parse_jinja2(
+ cloud_init_content=cloud_init_content, params=params, context=context
+ )
+
+ def test__parse_jinja2(self):
+ pass
+
+ def test__process_vdu_params_empty_kargs(self):
+ pass
+
+ def test__process_vdu_params_interface_ns_vld_id(self):
+ pass
+
+ def test__process_vdu_params_interface_vnf_vld_id(self):
+ pass
+
+ def test__process_vdu_params_interface_unknown(self):
+ pass
+
+ def test__process_vdu_params_interface_port_security_enabled(self):
+ pass
+
+ def test__process_vdu_params_interface_port_security_disable_strategy(self):
+ pass
+
+ def test__process_vdu_params_interface_sriov(self):
+ pass
+
+ def test__process_vdu_params_interface_pci_passthrough(self):
+ pass
+
+ def test__process_vdu_params_interface_om_mgmt(self):
+ pass
+
+ def test__process_vdu_params_interface_mgmt_interface(self):
+ pass
+
+ def test__process_vdu_params_interface_mgmt_vnf(self):
+ pass
+
+ def test__process_vdu_params_interface_bridge(self):
+ pass
+
+ def test__process_vdu_params_interface_ip_address(self):
+ pass
+
+ def test__process_vdu_params_interface_mac_address(self):
+ pass
+
+ def test__process_vdu_params_vdu_cloud_init_missing(self):
+ pass
+
+ def test__process_vdu_params_vdu_cloud_init_present(self):
+ pass
+
+ def test__process_vdu_params_vdu_boot_data_drive(self):
+ pass
+
+ def test__process_vdu_params_vdu_ssh_keys(self):
+ pass
+
+ def test__process_vdu_params_vdu_ssh_access_required(self):
+ pass
+
+ @patch("osm_ng_ro.ns.Ns._get_cloud_init")
+ @patch("osm_ng_ro.ns.Ns._parse_jinja2")
+ def test__process_vdu_params_vdu_persistent_root_volume(
+ self, get_cloud_init, parse_jinja2
+ ):
+ db = MagicMock(name="database mock")
+ kwargs = {
+ "db": db,
+ "vdu2cloud_init": {},
+ "vnfr": {
+ "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
+ "member-vnf-index-ref": "vnf-several-volumes",
+ },
+ }
+ get_cloud_init.return_value = {}
+ parse_jinja2.return_value = {}
+ db.get_one.return_value = {
+ "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
+ "df": [
+ {
+ "id": "default-df",
+ "vdu-profile": [
+ {"id": "several_volumes-VM", "min-number-of-instances": 1}
+ ],
+ }
+ ],
+ "id": "several_volumes-vnf",
+ "product-name": "several_volumes-vnf",
+ "vdu": [
+ {
+ "id": "several_volumes-VM",
+ "name": "several_volumes-VM",
+ "sw-image-desc": "ubuntu20.04",
+ "alternative-sw-image-desc": [
+ "ubuntu20.04-aws",
+ "ubuntu20.04-azure",
+ ],
+ "virtual-storage-desc": [
+ "persistent-root-volume",
+ "persistent-volume2",
+ "ephemeral-volume",
+ ],
+ }
+ ],
+ "version": "1.0",
+ "virtual-storage-desc": [
+ {
+ "id": "persistent-volume2",
+ "type-of-storage": "persistent-storage:persistent-storage",
+ "size-of-storage": "10",
+ },
+ {
+ "id": "persistent-root-volume",
+ "type-of-storage": "persistent-storage:persistent-storage",
+ "size-of-storage": "10",
+ },
+ {
+ "id": "ephemeral-volume",
+ "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
+ "size-of-storage": "1",
+ },
+ ],
+ "_admin": {
+ "storage": {
+ "fs": "mongo",
+ "path": "/app/storage/",
+ },
+ "type": "vnfd",
+ },
+ }
+
+ target_vdu = {
+ "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
+ "ns-flavor-id": "0",
+ "ns-image-id": "0",
+ "vdu-name": "several_volumes-VM",
+ "interfaces": [
+ {
+ "name": "vdu-eth0",
+ "ns-vld-id": "mgmtnet",
+ }
+ ],
+ "virtual-storages": [
+ {
+ "id": "persistent-volume2",
+ "size-of-storage": "10",
+ "type-of-storage": "persistent-storage:persistent-storage",
+ },
+ {
+ "id": "persistent-root-volume",
+ "size-of-storage": "10",
+ "type-of-storage": "persistent-storage:persistent-storage",
+ },
+ {
+ "id": "ephemeral-volume",
+ "size-of-storage": "1",
+ "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
+ },
+ ],
+ }
+ indata = {
+ "name": "sample_name",
+ }
+ expected_result = [{"image_id": "ubuntu20.04", "size": "10"}, {"size": "10"}]
+ result = Ns._process_vdu_params(
+ target_vdu, indata, vim_info=None, target_record_id=None, **kwargs
+ )
+ self.assertEqual(
+ expected_result, result["params"]["disk_list"], "Wrong Disk List"
+ )
+
+ @patch("osm_ng_ro.ns.Ns._get_cloud_init")
+ @patch("osm_ng_ro.ns.Ns._parse_jinja2")
+ def test__process_vdu_params_vdu_without_persistent_storage(
+ self, get_cloud_init, parse_jinja2
+ ):
+ db = MagicMock(name="database mock")
+ kwargs = {
+ "db": db,
+ "vdu2cloud_init": {},
+ "vnfr": {
+ "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
+ "member-vnf-index-ref": "vnf-several-volumes",
+ },
+ }
+ get_cloud_init.return_value = {}
+ parse_jinja2.return_value = {}
+ db.get_one.return_value = {
+ "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
+ "df": [
+ {
+ "id": "default-df",
+ "vdu-profile": [
+ {"id": "without_volumes-VM", "min-number-of-instances": 1}
+ ],
+ }
+ ],
+ "id": "without_volumes-vnf",
+ "product-name": "without_volumes-vnf",
+ "vdu": [
+ {
+ "id": "without_volumes-VM",
+ "name": "without_volumes-VM",
+ "sw-image-desc": "ubuntu20.04",
+ "alternative-sw-image-desc": [
+ "ubuntu20.04-aws",
+ "ubuntu20.04-azure",
+ ],
+ "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
+ }
+ ],
+ "version": "1.0",
+ "virtual-storage-desc": [
+ {"id": "root-volume", "size-of-storage": "10"},
+ {
+ "id": "ephemeral-volume",
+ "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
+ "size-of-storage": "1",
+ },
+ ],
+ "_admin": {
+ "storage": {
+ "fs": "mongo",
+ "path": "/app/storage/",
+ },
+ "type": "vnfd",
+ },
+ }
+
+ target_vdu = {
+ "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
+ "ns-flavor-id": "0",
+ "ns-image-id": "0",
+ "vdu-name": "without_volumes-VM",
+ "interfaces": [
+ {
+ "name": "vdu-eth0",
+ "ns-vld-id": "mgmtnet",
+ }
+ ],
+ "virtual-storages": [
+ {
+ "id": "root-volume",
+ "size-of-storage": "10",
+ },
+ {
+ "id": "ephemeral-volume",
+ "size-of-storage": "1",
+ "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
+ },
+ ],
+ }
+ indata = {
+ "name": "sample_name",
+ }
+ expected_result = []
+ result = Ns._process_vdu_params(
+ target_vdu, indata, vim_info=None, target_record_id=None, **kwargs
+ )
+ self.assertEqual(
+ expected_result, result["params"]["disk_list"], "Wrong Disk List"
+ )
+
+ def test__process_vdu_params(self):
+ pass
+
+ @patch("osm_ng_ro.ns.Ns._assign_vim")
+ def test__rebuild_start_stop_task(self, assign_vim):
+ self.ns = Ns()
+ extra_dict = {}
+ actions = ["start", "stop", "rebuild"]
+ vdu_id = "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
+ vnf_id = "665b4165-ce24-4320-bf19-b9a45bade49f"
+ vdu_index = "0"
+ action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
+ nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
+ task_index = 0
+ target_vim = "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
+ t = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
+ for action in actions:
+ expected_result = {
+ "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
+ "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
+ "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
+ "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
+ "status": "SCHEDULED",
+ "action": "EXEC",
+ "item": "update",
+ "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0",
+ "target_record_id": t,
+ "params": {
+ "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
+ "action": action,
+ },
+ }
+ extra_dict["params"] = {
+ "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
+ "action": action,
+ }
+ task = self.ns.rebuild_start_stop_task(
+ vdu_id,
+ vnf_id,
+ vdu_index,
+ action_id,
+ nsr_id,
+ task_index,
+ target_vim,
+ extra_dict,
+ )
+ self.assertEqual(task.get("action_id"), action_id)
+ self.assertEqual(task.get("nsr_id"), nsr_id)
+ self.assertEqual(task.get("target_id"), target_vim)
+ self.assertDictEqual(task, expected_result)
+
+ @patch("osm_ng_ro.ns.Ns._assign_vim")
+ def test_verticalscale_task(self, assign_vim):
+ self.ns = Ns()
+ extra_dict = {}
+ vdu_index = "1"
+ action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
+ nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
+ task_index = 1
+ target_record_id = (
+ "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
+ "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
+ )
+
+ expected_result = {
+ "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
+ "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
+ "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
+ "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
+ "status": "SCHEDULED",
+ "action": "EXEC",
+ "item": "verticalscale",
+ "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
+ "target_record_id": target_record_id,
+ "params": {
+ "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
+ "flavor_dict": "flavor_dict",
+ },
+ }
+ vdu = {
+ "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
+ "vim_info": {
+ "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
+ },
+ }
+ vnf = {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
+ extra_dict["params"] = {
+ "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
+ "flavor_dict": "flavor_dict",
+ }
+ task = self.ns.verticalscale_task(
+ vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
+ )
+
+ self.assertDictEqual(task, expected_result)
+
+ @patch("osm_ng_ro.ns.Ns._assign_vim")
+ def test_migrate_task(self, assign_vim):
+ self.ns = Ns()
+ extra_dict = {}
+ vdu_index = "1"
+ action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
+ nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
+ task_index = 1
+ target_record_id = (
+ "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
+ "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
+ )
+
+ expected_result = {
+ "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
+ "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
+ "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
+ "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
+ "status": "SCHEDULED",
+ "action": "EXEC",
+ "item": "migrate",
+ "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
+ "target_record_id": target_record_id,
+ "params": {
+ "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
+ "migrate_host": "migrateToHost",
+ },
+ }
+ vdu = {
+ "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
+ "vim_info": {
+ "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
+ },
+ }
+ vnf = {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
+ extra_dict["params"] = {
+ "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
+ "migrate_host": "migrateToHost",
+ }
+ task = self.ns.migrate_task(
+ vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
+ )
+
+ self.assertDictEqual(task, expected_result)