1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy
import deepcopy
19 from unittest
.mock
import MagicMock
, Mock
, patch
29 from osm_ng_ro
.ns
import Ns
, NsException
32 __author__
= "Eduardo Sousa"
33 __date__
= "$19-NOV-2021 00:00:00$"
36 # Variables used in Tests
37 vnfd_wth_persistent_storage
= {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
56 "virtual-storage-desc": [
57 "persistent-root-volume",
64 "virtual-storage-desc": [
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
76 "id": "ephemeral-volume",
77 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
78 "size-of-storage": "1",
84 "path": "/app/storage/",
89 vim_volume_id
= "ru937f49-3870-4169-b758-9732e1ff40f3"
90 task_by_target_record_id
= {
91 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
92 "extra_dict": {"params": {"net_type": "SR-IOV"}}
95 interfaces_wthout_positions
= [
106 "ns-vld-id": "mgmtnet",
109 interfaces_wth_all_positions
= [
122 "ns-vld-id": "mgmtnet",
126 target_vdu_wth_persistent_storage
= {
127 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
130 "vdu-name": "several_volumes-VM",
134 "ns-vld-id": "mgmtnet",
137 "virtual-storages": [
139 "id": "persistent-volume2",
140 "size-of-storage": "10",
141 "type-of-storage": "persistent-storage:persistent-storage",
144 "id": "persistent-root-volume",
145 "size-of-storage": "10",
146 "type-of-storage": "persistent-storage:persistent-storage",
149 "id": "ephemeral-volume",
150 "size-of-storage": "1",
151 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
155 db
= MagicMock(name
="database mock")
156 fs
= MagicMock(name
="database mock")
157 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
158 vnf_preffix
= "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
159 vnfr_id
= "wh47f48-y870-4169-b758-5732e1ff40f5"
160 nsr_id
= "th47f48-9870-4169-b758-9732e1ff40f3"
162 "name": "sample_name",
164 expected_extra_dict
= {
166 f
"{ns_preffix}:image.0",
167 f
"{ns_preffix}:flavor.0",
170 "affinity_group_list": [],
171 "availability_zone_index": None,
172 "availability_zone_list": None,
173 "cloud_config": None,
174 "description": "several_volumes-VM",
176 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
177 "image_id": f
"TASK-{ns_preffix}:image.0",
178 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
184 expected_extra_dict2
= {
186 f
"{ns_preffix}:image.0",
187 f
"{ns_preffix}:flavor.0",
190 "affinity_group_list": [],
191 "availability_zone_index": None,
192 "availability_zone_list": None,
193 "cloud_config": None,
194 "description": "without_volumes-VM",
196 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
197 "image_id": f
"TASK-{ns_preffix}:image.0",
198 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
203 tasks_by_target_record_id
= {
204 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
207 "net_type": "SR-IOV",
214 "vdu2cloud_init": {},
216 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
217 "member-vnf-index-ref": "vnf-several-volumes",
220 vnfd_wthout_persistent_storage
= {
221 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
225 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
228 "id": "without_volumes-vnf",
229 "product-name": "without_volumes-vnf",
232 "id": "without_volumes-VM",
233 "name": "without_volumes-VM",
234 "sw-image-desc": "ubuntu20.04",
235 "alternative-sw-image-desc": [
239 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
243 "virtual-storage-desc": [
244 {"id": "root-volume", "size-of-storage": "10"},
246 "id": "ephemeral-volume",
247 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
248 "size-of-storage": "1",
254 "path": "/app/storage/",
260 target_vdu_wthout_persistent_storage
= {
261 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
264 "vdu-name": "without_volumes-VM",
268 "ns-vld-id": "mgmtnet",
271 "virtual-storages": [
274 "size-of-storage": "10",
277 "id": "ephemeral-volume",
278 "size-of-storage": "1",
279 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
283 cloud_init_content
= """
288 overwrite: {{is_override}}
291 - [ sh, -xc, "echo $(date) '{{command}}'" ]
302 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
306 class CopyingMock(MagicMock
):
307 def __call__(self
, *args
, **kwargs
):
308 args
= deepcopy(args
)
309 kwargs
= deepcopy(kwargs
)
310 return super(CopyingMock
, self
).__call
__(*args
, **kwargs
)
313 class TestNs(unittest
.TestCase
):
317 def test__create_task_without_extra_dict(self
):
319 "target_id": "vim_openstack_1",
320 "action_id": "123456",
322 "task_id": "123456:1",
323 "status": "SCHEDULED",
326 "target_record": "test_target_record",
327 "target_record_id": "test_target_record_id",
330 "action_id": "123456",
335 task
= Ns
._create
_task
(
336 deployment_info
=deployment_info
,
337 target_id
="vim_openstack_1",
340 target_record
="test_target_record",
341 target_record_id
="test_target_record_id",
344 self
.assertEqual(deployment_info
.get("task_index"), 2)
345 self
.assertDictEqual(task
, expected_result
)
347 def test__create_task(self
):
349 "target_id": "vim_openstack_1",
350 "action_id": "123456",
352 "task_id": "123456:1",
353 "status": "SCHEDULED",
356 "target_record": "test_target_record",
357 "target_record_id": "test_target_record_id",
358 # values coming from extra_dict
359 "params": "test_params",
360 "find_params": "test_find_params",
361 "depends_on": "test_depends_on",
364 "action_id": "123456",
369 task
= Ns
._create
_task
(
370 deployment_info
=deployment_info
,
371 target_id
="vim_openstack_1",
374 target_record
="test_target_record",
375 target_record_id
="test_target_record_id",
377 "params": "test_params",
378 "find_params": "test_find_params",
379 "depends_on": "test_depends_on",
383 self
.assertEqual(deployment_info
.get("task_index"), 2)
384 self
.assertDictEqual(task
, expected_result
)
386 @patch("osm_ng_ro.ns.time")
387 def test__create_ro_task(self
, mock_time
: Mock
):
388 now
= 1637324838.994551
389 mock_time
.return_value
= now
391 "target_id": "vim_openstack_1",
392 "action_id": "123456",
394 "task_id": "123456:1",
395 "status": "SCHEDULED",
398 "target_record": "test_target_record",
399 "target_record_id": "test_target_record_id",
400 # values coming from extra_dict
401 "params": "test_params",
402 "find_params": "test_find_params",
403 "depends_on": "test_depends_on",
409 "target_id": "vim_openstack_1",
412 "created_items": None,
426 ro_task
= Ns
._create
_ro
_task
(
427 target_id
="vim_openstack_1",
431 self
.assertDictEqual(ro_task
, expected_result
)
433 def test__process_image_params_with_empty_target_image(self
):
439 result
= Ns
._process
_image
_params
(
440 target_image
=target_image
,
443 target_record_id
=None,
446 self
.assertDictEqual(expected_result
, result
)
448 def test__process_image_params_with_wrong_target_image(self
):
453 "no_image": "to_see_here",
456 result
= Ns
._process
_image
_params
(
457 target_image
=target_image
,
460 target_record_id
=None,
463 self
.assertDictEqual(expected_result
, result
)
465 def test__process_image_params_with_image(self
):
477 result
= Ns
._process
_image
_params
(
478 target_image
=target_image
,
481 target_record_id
=None,
484 self
.assertDictEqual(expected_result
, result
)
486 def test__process_image_params_with_vim_image_id(self
):
495 "vim_image_id": "123456",
498 result
= Ns
._process
_image
_params
(
499 target_image
=target_image
,
502 target_record_id
=None,
505 self
.assertDictEqual(expected_result
, result
)
507 def test__process_image_params_with_image_checksum(self
):
511 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
516 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
519 result
= Ns
._process
_image
_params
(
520 target_image
=target_image
,
523 target_record_id
=None,
526 self
.assertDictEqual(expected_result
, result
)
528 def test__get_resource_allocation_params_with_empty_target_image(self
):
530 quota_descriptor
= {}
532 result
= Ns
._get
_resource
_allocation
_params
(
533 quota_descriptor
=quota_descriptor
,
536 self
.assertDictEqual(expected_result
, result
)
538 def test__get_resource_allocation_params_with_wrong_target_image(self
):
541 "no_quota": "present_here",
544 result
= Ns
._get
_resource
_allocation
_params
(
545 quota_descriptor
=quota_descriptor
,
548 self
.assertDictEqual(expected_result
, result
)
550 def test__get_resource_allocation_params_with_limit(self
):
558 result
= Ns
._get
_resource
_allocation
_params
(
559 quota_descriptor
=quota_descriptor
,
562 self
.assertDictEqual(expected_result
, result
)
564 def test__get_resource_allocation_params_with_reserve(self
):
572 result
= Ns
._get
_resource
_allocation
_params
(
573 quota_descriptor
=quota_descriptor
,
576 self
.assertDictEqual(expected_result
, result
)
578 def test__get_resource_allocation_params_with_shares(self
):
586 result
= Ns
._get
_resource
_allocation
_params
(
587 quota_descriptor
=quota_descriptor
,
590 self
.assertDictEqual(expected_result
, result
)
592 def test__get_resource_allocation_params(self
):
604 result
= Ns
._get
_resource
_allocation
_params
(
605 quota_descriptor
=quota_descriptor
,
608 self
.assertDictEqual(expected_result
, result
)
610 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
611 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
619 result
= Ns
._process
_guest
_epa
_quota
_params
(
620 guest_epa_quota
=guest_epa_quota
,
621 epa_vcpu_set
=epa_vcpu_set
,
624 self
.assertDictEqual(expected_result
, result
)
625 self
.assertFalse(resource_allocation
.called
)
627 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
628 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
636 result
= Ns
._process
_guest
_epa
_quota
_params
(
637 guest_epa_quota
=guest_epa_quota
,
638 epa_vcpu_set
=epa_vcpu_set
,
641 self
.assertDictEqual(expected_result
, result
)
642 self
.assertFalse(resource_allocation
.called
)
644 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
645 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
651 "no-quota": "nothing",
655 result
= Ns
._process
_guest
_epa
_quota
_params
(
656 guest_epa_quota
=guest_epa_quota
,
657 epa_vcpu_set
=epa_vcpu_set
,
660 self
.assertDictEqual(expected_result
, result
)
661 self
.assertFalse(resource_allocation
.called
)
663 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
664 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
670 "no-quota": "nothing",
674 result
= Ns
._process
_guest
_epa
_quota
_params
(
675 guest_epa_quota
=guest_epa_quota
,
676 epa_vcpu_set
=epa_vcpu_set
,
679 self
.assertDictEqual(expected_result
, result
)
680 self
.assertFalse(resource_allocation
.called
)
682 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
683 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
697 result
= Ns
._process
_guest
_epa
_quota
_params
(
698 guest_epa_quota
=guest_epa_quota
,
699 epa_vcpu_set
=epa_vcpu_set
,
702 self
.assertDictEqual(expected_result
, result
)
703 self
.assertFalse(resource_allocation
.called
)
705 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
706 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
726 resource_allocation_param
= {
731 resource_allocation
.return_value
= {
737 result
= Ns
._process
_guest
_epa
_quota
_params
(
738 guest_epa_quota
=guest_epa_quota
,
739 epa_vcpu_set
=epa_vcpu_set
,
742 resource_allocation
.assert_called_once_with(resource_allocation_param
)
743 self
.assertDictEqual(expected_result
, result
)
745 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
746 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
766 resource_allocation_param
= {
771 resource_allocation
.return_value
= {
777 result
= Ns
._process
_guest
_epa
_quota
_params
(
778 guest_epa_quota
=guest_epa_quota
,
779 epa_vcpu_set
=epa_vcpu_set
,
782 resource_allocation
.assert_called_once_with(resource_allocation_param
)
783 self
.assertDictEqual(expected_result
, result
)
785 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
786 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
806 resource_allocation_param
= {
811 resource_allocation
.return_value
= {
817 result
= Ns
._process
_guest
_epa
_quota
_params
(
818 guest_epa_quota
=guest_epa_quota
,
819 epa_vcpu_set
=epa_vcpu_set
,
822 resource_allocation
.assert_called_once_with(resource_allocation_param
)
823 self
.assertDictEqual(expected_result
, result
)
825 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
826 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
846 resource_allocation_param
= {
851 resource_allocation
.return_value
= {
857 result
= Ns
._process
_guest
_epa
_quota
_params
(
858 guest_epa_quota
=guest_epa_quota
,
859 epa_vcpu_set
=epa_vcpu_set
,
862 resource_allocation
.assert_called_once_with(resource_allocation_param
)
863 self
.assertDictEqual(expected_result
, result
)
865 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
866 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
886 resource_allocation_param
= {
891 resource_allocation
.return_value
= {
897 result
= Ns
._process
_guest
_epa
_quota
_params
(
898 guest_epa_quota
=guest_epa_quota
,
899 epa_vcpu_set
=epa_vcpu_set
,
902 resource_allocation
.assert_called_once_with(resource_allocation_param
)
903 self
.assertDictEqual(expected_result
, result
)
905 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
906 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
926 resource_allocation_param
= {
931 resource_allocation
.return_value
= {
937 result
= Ns
._process
_guest
_epa
_quota
_params
(
938 guest_epa_quota
=guest_epa_quota
,
939 epa_vcpu_set
=epa_vcpu_set
,
942 resource_allocation
.assert_called_once_with(resource_allocation_param
)
943 self
.assertDictEqual(expected_result
, result
)
945 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
946 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
966 resource_allocation_param
= {
971 resource_allocation
.return_value
= {
977 result
= Ns
._process
_guest
_epa
_quota
_params
(
978 guest_epa_quota
=guest_epa_quota
,
979 epa_vcpu_set
=epa_vcpu_set
,
982 resource_allocation
.assert_called_once_with(resource_allocation_param
)
983 self
.assertDictEqual(expected_result
, result
)
985 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
986 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
1031 resource_allocation
.return_value
= {
1037 result
= Ns
._process
_guest
_epa
_quota
_params
(
1038 guest_epa_quota
=guest_epa_quota
,
1039 epa_vcpu_set
=epa_vcpu_set
,
1042 self
.assertTrue(resource_allocation
.called
)
1043 self
.assertDictEqual(expected_result
, result
)
1045 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1046 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1048 resource_allocation
,
1094 epa_vcpu_set
= False
1096 resource_allocation
.return_value
= {
1102 result
= Ns
._process
_guest
_epa
_quota
_params
(
1103 guest_epa_quota
=guest_epa_quota
,
1104 epa_vcpu_set
=epa_vcpu_set
,
1107 self
.assertTrue(resource_allocation
.called
)
1108 self
.assertDictEqual(expected_result
, result
)
1110 def test__process_guest_epa_numa_params_with_empty_numa_params(self
):
1111 expected_numa_result
= []
1112 expected_epa_vcpu_set_result
= False
1113 guest_epa_quota
= {}
1115 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1116 guest_epa_quota
=guest_epa_quota
,
1118 self
.assertEqual(expected_numa_result
, numa_result
)
1119 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1121 def test__process_guest_epa_numa_params_with_wrong_numa_params(self
):
1122 expected_numa_result
= []
1123 expected_epa_vcpu_set_result
= False
1124 guest_epa_quota
= {"no_nume": "here"}
1126 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1127 guest_epa_quota
=guest_epa_quota
,
1130 self
.assertEqual(expected_numa_result
, numa_result
)
1131 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1133 def test__process_guest_epa_numa_params_with_numa_node_policy(self
):
1134 expected_numa_result
= []
1135 expected_epa_vcpu_set_result
= False
1136 guest_epa_quota
= {"numa-node-policy": {}}
1138 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1139 guest_epa_quota
=guest_epa_quota
,
1142 self
.assertEqual(expected_numa_result
, numa_result
)
1143 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1145 def test__process_guest_epa_numa_params_with_no_node(self
):
1146 expected_numa_result
= []
1147 expected_epa_vcpu_set_result
= False
1149 "numa-node-policy": {
1154 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1155 guest_epa_quota
=guest_epa_quota
,
1158 self
.assertEqual(expected_numa_result
, numa_result
)
1159 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1161 def test__process_guest_epa_numa_params_with_1_node_num_cores(self
):
1162 expected_numa_result
= [{"cores": 3}]
1163 expected_epa_vcpu_set_result
= True
1165 "numa-node-policy": {
1174 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1175 guest_epa_quota
=guest_epa_quota
,
1178 self
.assertEqual(expected_numa_result
, numa_result
)
1179 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1181 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self
):
1182 expected_numa_result
= [{"paired_threads": 3}]
1183 expected_epa_vcpu_set_result
= True
1185 "numa-node-policy": {
1188 "paired-threads": {"num-paired-threads": "3"},
1194 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1195 guest_epa_quota
=guest_epa_quota
,
1198 self
.assertEqual(expected_numa_result
, numa_result
)
1199 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1201 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self
):
1202 expected_numa_result
= [
1204 "paired-threads-id": [("0", "1"), ("4", "5")],
1207 expected_epa_vcpu_set_result
= False
1209 "numa-node-policy": {
1213 "paired-thread-ids": [
1229 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1230 guest_epa_quota
=guest_epa_quota
,
1233 self
.assertEqual(expected_numa_result
, numa_result
)
1234 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1236 def test__process_guest_epa_numa_params_with_1_node_num_threads(self
):
1237 expected_numa_result
= [{"threads": 3}]
1238 expected_epa_vcpu_set_result
= True
1240 "numa-node-policy": {
1249 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1250 guest_epa_quota
=guest_epa_quota
,
1253 self
.assertEqual(expected_numa_result
, numa_result
)
1254 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1256 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self
):
1257 expected_numa_result
= [{"memory": 2}]
1258 expected_epa_vcpu_set_result
= False
1260 "numa-node-policy": {
1269 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1270 guest_epa_quota
=guest_epa_quota
,
1273 self
.assertEqual(expected_numa_result
, numa_result
)
1274 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1276 def test__process_guest_epa_numa_params_with_1_node_vcpu(self
):
1277 expected_numa_result
= [
1283 expected_epa_vcpu_set_result
= False
1285 "numa-node-policy": {
1286 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1290 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1291 guest_epa_quota
=guest_epa_quota
,
1294 self
.assertEqual(expected_numa_result
, numa_result
)
1295 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1297 def test__process_guest_epa_numa_params_with_2_node_vcpu(self
):
1298 expected_numa_result
= [
1309 expected_epa_vcpu_set_result
= False
1311 "numa-node-policy": {
1313 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1314 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1319 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1320 guest_epa_quota
=guest_epa_quota
,
1323 self
.assertEqual(expected_numa_result
, numa_result
)
1324 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1326 def test__process_guest_epa_numa_params_with_1_node(self
):
1327 expected_numa_result
= [
1332 "paired_threads": 3,
1333 "paired-threads-id": [("0", "1"), ("4", "5")],
1338 expected_epa_vcpu_set_result
= True
1340 "numa-node-policy": {
1345 "num-paired-threads": "3",
1346 "paired-thread-ids": [
1364 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1365 guest_epa_quota
=guest_epa_quota
,
1368 self
.assertEqual(expected_numa_result
, numa_result
)
1369 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1371 def test__process_guest_epa_numa_params_with_2_nodes(self
):
1372 expected_numa_result
= [
1375 "paired_threads": 3,
1376 "paired-threads-id": [("0", "1"), ("4", "5")],
1382 "paired_threads": 7,
1383 "paired-threads-id": [("2", "3"), ("5", "6")],
1388 expected_epa_vcpu_set_result
= True
1390 "numa-node-policy": {
1395 "num-paired-threads": "3",
1396 "paired-thread-ids": [
1413 "num-paired-threads": "7",
1414 "paired-thread-ids": [
1432 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1433 guest_epa_quota
=guest_epa_quota
,
1436 self
.assertEqual(expected_numa_result
, numa_result
)
1437 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1439 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self
):
1440 expected_numa_result
= {}
1441 expected_epa_vcpu_set_result
= False
1442 guest_epa_quota
= {}
1444 epa_vcpu_set
= False
1446 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1447 guest_epa_quota
=guest_epa_quota
,
1448 vcpu_count
=vcpu_count
,
1449 epa_vcpu_set
=epa_vcpu_set
,
1452 self
.assertDictEqual(expected_numa_result
, numa_result
)
1453 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1455 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self
):
1456 expected_numa_result
= {}
1457 expected_epa_vcpu_set_result
= False
1459 "no-cpu-pinning-policy": "here",
1462 epa_vcpu_set
= False
1464 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1465 guest_epa_quota
=guest_epa_quota
,
1466 vcpu_count
=vcpu_count
,
1467 epa_vcpu_set
=epa_vcpu_set
,
1470 self
.assertDictEqual(expected_numa_result
, numa_result
)
1471 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1473 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self
):
1474 expected_numa_result
= {}
1475 expected_epa_vcpu_set_result
= True
1477 "cpu-pinning-policy": "DEDICATED",
1482 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1483 guest_epa_quota
=guest_epa_quota
,
1484 vcpu_count
=vcpu_count
,
1485 epa_vcpu_set
=epa_vcpu_set
,
1488 self
.assertDictEqual(expected_numa_result
, numa_result
)
1489 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1491 def test__process_guest_epa_cpu_pinning_params_with_threads(self
):
1492 expected_numa_result
= {"threads": 3}
1493 expected_epa_vcpu_set_result
= True
1495 "cpu-pinning-policy": "DEDICATED",
1496 "cpu-thread-pinning-policy": "PREFER",
1499 epa_vcpu_set
= False
1501 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1502 guest_epa_quota
=guest_epa_quota
,
1503 vcpu_count
=vcpu_count
,
1504 epa_vcpu_set
=epa_vcpu_set
,
1507 self
.assertDictEqual(expected_numa_result
, numa_result
)
1508 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1510 def test__process_guest_epa_cpu_pinning_params(self
):
1511 expected_numa_result
= {"cores": 3}
1512 expected_epa_vcpu_set_result
= True
1514 "cpu-pinning-policy": "DEDICATED",
1517 epa_vcpu_set
= False
1519 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1520 guest_epa_quota
=guest_epa_quota
,
1521 vcpu_count
=vcpu_count
,
1522 epa_vcpu_set
=epa_vcpu_set
,
1525 self
.assertDictEqual(expected_numa_result
, numa_result
)
1526 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1528 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1529 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1530 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1531 def test__process_guest_epa_params_with_empty_params(
1533 guest_epa_numa_params
,
1534 guest_epa_cpu_pinning_params
,
1535 guest_epa_quota_params
,
1537 expected_result
= {}
1540 result
= Ns
._process
_epa
_params
(
1541 target_flavor
=target_flavor
,
1544 self
.assertDictEqual(expected_result
, result
)
1545 self
.assertFalse(guest_epa_numa_params
.called
)
1546 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1547 self
.assertFalse(guest_epa_quota_params
.called
)
1549 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1550 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1551 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1552 def test__process_guest_epa_params_with_wrong_params(
1554 guest_epa_numa_params
,
1555 guest_epa_cpu_pinning_params
,
1556 guest_epa_quota_params
,
1558 expected_result
= {}
1560 "no-guest-epa": "here",
1563 result
= Ns
._process
_epa
_params
(
1564 target_flavor
=target_flavor
,
1567 self
.assertDictEqual(expected_result
, result
)
1568 self
.assertFalse(guest_epa_numa_params
.called
)
1569 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1570 self
.assertFalse(guest_epa_quota_params
.called
)
1572 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1573 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1574 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1575 def test__process_guest_epa_params(
1577 guest_epa_numa_params
,
1578 guest_epa_cpu_pinning_params
,
1579 guest_epa_quota_params
,
1582 "mem-policy": "STRICT",
1587 "numa-node-policy": {
1588 "mem-policy": "STRICT",
1593 guest_epa_numa_params
.return_value
= ({}, False)
1594 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1595 guest_epa_quota_params
.return_value
= {}
1597 result
= Ns
._process
_epa
_params
(
1598 target_flavor
=target_flavor
,
1601 self
.assertDictEqual(expected_result
, result
)
1602 self
.assertTrue(guest_epa_numa_params
.called
)
1603 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1604 self
.assertTrue(guest_epa_quota_params
.called
)
1606 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1607 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1608 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1609 def test__process_guest_epa_params_with_mempage_size(
1611 guest_epa_numa_params
,
1612 guest_epa_cpu_pinning_params
,
1613 guest_epa_quota_params
,
1616 "mempage-size": "1G",
1617 "mem-policy": "STRICT",
1622 "mempage-size": "1G",
1623 "numa-node-policy": {
1624 "mem-policy": "STRICT",
1629 guest_epa_numa_params
.return_value
= ({}, False)
1630 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1631 guest_epa_quota_params
.return_value
= {}
1633 result
= Ns
._process
_epa
_params
(
1634 target_flavor
=target_flavor
,
1637 self
.assertDictEqual(expected_result
, result
)
1638 self
.assertTrue(guest_epa_numa_params
.called
)
1639 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1640 self
.assertTrue(guest_epa_quota_params
.called
)
1642 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1643 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1644 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1645 def test__process_guest_epa_params_with_numa(
1647 guest_epa_numa_params
,
1648 guest_epa_cpu_pinning_params
,
1649 guest_epa_quota_params
,
1652 "mempage-size": "1G",
1653 "cpu-pinning-policy": "DEDICATED",
1654 "cpu-thread-pinning-policy": "PREFER",
1659 "paired-threads": 3,
1660 "paired-threads-id": [("0", "1"), ("4", "5")],
1664 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1665 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1666 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1667 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1672 "mempage-size": "1G",
1673 "cpu-pinning-policy": "DEDICATED",
1674 "cpu-thread-pinning-policy": "PREFER",
1675 "numa-node-policy": {
1680 "num-paired-threads": "3",
1681 "paired-thread-ids": [
1720 guest_epa_numa_params
.return_value
= (
1724 "paired-threads": 3,
1725 "paired-threads-id": [("0", "1"), ("4", "5")],
1732 guest_epa_cpu_pinning_params
.return_value
= (
1738 guest_epa_quota_params
.return_value
= {
1761 result
= Ns
._process
_epa
_params
(
1762 target_flavor
=target_flavor
,
1764 self
.assertEqual(expected_result
, result
)
1765 self
.assertTrue(guest_epa_numa_params
.called
)
1766 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1767 self
.assertTrue(guest_epa_quota_params
.called
)
1769 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1770 def test__process_flavor_params_with_empty_target_flavor(
1779 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1784 target_record_id
= ""
1786 with self
.assertRaises(KeyError):
1787 Ns
._process
_flavor
_params
(
1788 target_flavor
=target_flavor
,
1791 target_record_id
=target_record_id
,
1794 self
.assertFalse(epa_params
.called
)
1796 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1797 def test__process_flavor_params_with_wrong_target_flavor(
1803 "no-target-flavor": "here",
1807 target_record_id
= ""
1809 with self
.assertRaises(KeyError):
1810 Ns
._process
_flavor
_params
(
1811 target_flavor
=target_flavor
,
1814 target_record_id
=target_record_id
,
1817 self
.assertFalse(epa_params
.called
)
1819 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1820 def test__process_flavor_params_with_empty_indata(
1845 "memory-mb": "1024",
1850 target_record_id
= ""
1852 epa_params
.return_value
= {}
1854 result
= Ns
._process
_flavor
_params
(
1855 target_flavor
=target_flavor
,
1858 target_record_id
=target_record_id
,
1861 self
.assertTrue(epa_params
.called
)
1862 self
.assertDictEqual(result
, expected_result
)
1864 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1865 def test__process_flavor_params_with_wrong_indata(
1890 "memory-mb": "1024",
1897 target_record_id
= ""
1899 epa_params
.return_value
= {}
1901 result
= Ns
._process
_flavor
_params
(
1902 target_flavor
=target_flavor
,
1905 target_record_id
=target_record_id
,
1908 self
.assertTrue(epa_params
.called
)
1909 self
.assertDictEqual(result
, expected_result
)
1911 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1912 def test__process_flavor_params_with_ephemeral_disk(
1920 db
.get_one
.return_value
= {
1921 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1926 {"id": "without_volumes-VM", "min-number-of-instances": 1}
1930 "id": "without_volumes-vnf",
1931 "product-name": "without_volumes-vnf",
1934 "id": "without_volumes-VM",
1935 "name": "without_volumes-VM",
1936 "sw-image-desc": "ubuntu20.04",
1937 "alternative-sw-image-desc": [
1939 "ubuntu20.04-azure",
1941 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
1945 "virtual-storage-desc": [
1946 {"id": "root-volume", "size-of-storage": "10"},
1948 "id": "ephemeral-volume",
1949 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
1950 "size-of-storage": "1",
1956 "path": "/app/storage/",
1984 "memory-mb": "1024",
1992 "ns-flavor-id": "test_id",
1993 "virtual-storages": [
1995 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
1996 "size-of-storage": "10",
2001 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2006 target_record_id
= ""
2008 epa_params
.return_value
= {}
2010 result
= Ns
._process
_flavor
_params
(
2011 target_flavor
=target_flavor
,
2014 target_record_id
=target_record_id
,
2018 self
.assertTrue(epa_params
.called
)
2019 self
.assertDictEqual(result
, expected_result
)
2021 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2022 def test__process_flavor_params_with_swap_disk(
2050 "memory-mb": "1024",
2058 "ns-flavor-id": "test_id",
2059 "virtual-storages": [
2061 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2062 "size-of-storage": "20",
2067 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2072 target_record_id
= ""
2074 epa_params
.return_value
= {}
2076 result
= Ns
._process
_flavor
_params
(
2077 target_flavor
=target_flavor
,
2080 target_record_id
=target_record_id
,
2083 self
.assertTrue(epa_params
.called
)
2084 self
.assertDictEqual(result
, expected_result
)
2086 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2087 def test__process_flavor_params_with_persistent_root_disk(
2096 db
.get_one
.return_value
= {
2097 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2102 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2106 "id": "several_volumes-vnf",
2107 "product-name": "several_volumes-vnf",
2110 "id": "several_volumes-VM",
2111 "name": "several_volumes-VM",
2112 "sw-image-desc": "ubuntu20.04",
2113 "alternative-sw-image-desc": [
2115 "ubuntu20.04-azure",
2117 "virtual-storage-desc": [
2118 "persistent-root-volume",
2123 "virtual-storage-desc": [
2125 "id": "persistent-root-volume",
2126 "type-of-storage": "persistent-storage:persistent-storage",
2127 "size-of-storage": "10",
2133 "path": "/app/storage/",
2159 "memory-mb": "1024",
2167 "vdu-name": "several_volumes-VM",
2168 "ns-flavor-id": "test_id",
2169 "virtual-storages": [
2171 "type-of-storage": "persistent-storage:persistent-storage",
2172 "size-of-storage": "10",
2177 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2182 target_record_id
= ""
2184 epa_params
.return_value
= {}
2186 result
= Ns
._process
_flavor
_params
(
2187 target_flavor
=target_flavor
,
2190 target_record_id
=target_record_id
,
2194 self
.assertTrue(epa_params
.called
)
2195 self
.assertDictEqual(result
, expected_result
)
2197 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2198 def test__process_flavor_params_with_epa_params(
2210 "numa": "there-is-numa-here",
2221 "numa": "there-is-numa-here",
2230 "memory-mb": "1024",
2238 "ns-flavor-id": "test_id",
2241 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2246 target_record_id
= ""
2248 epa_params
.return_value
= {
2249 "numa": "there-is-numa-here",
2252 result
= Ns
._process
_flavor
_params
(
2253 target_flavor
=target_flavor
,
2256 target_record_id
=target_record_id
,
2259 self
.assertTrue(epa_params
.called
)
2260 self
.assertDictEqual(result
, expected_result
)
2262 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2263 def test__process_flavor_params(
2272 db
.get_one
.return_value
= {
2273 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2278 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2282 "id": "without_volumes-vnf",
2283 "product-name": "without_volumes-vnf",
2286 "id": "without_volumes-VM",
2287 "name": "without_volumes-VM",
2288 "sw-image-desc": "ubuntu20.04",
2289 "alternative-sw-image-desc": [
2291 "ubuntu20.04-azure",
2293 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2297 "virtual-storage-desc": [
2298 {"id": "root-volume", "size-of-storage": "10"},
2300 "id": "ephemeral-volume",
2301 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2302 "size-of-storage": "1",
2308 "path": "/app/storage/",
2323 "numa": "there-is-numa-here",
2336 "numa": "there-is-numa-here",
2345 "memory-mb": "1024",
2353 "ns-flavor-id": "test_id",
2354 "virtual-storages": [
2356 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2357 "size-of-storage": "10",
2360 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2361 "size-of-storage": "20",
2366 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2371 target_record_id
= ""
2373 epa_params
.return_value
= {
2374 "numa": "there-is-numa-here",
2377 result
= Ns
._process
_flavor
_params
(
2378 target_flavor
=target_flavor
,
2381 target_record_id
=target_record_id
,
2385 self
.assertTrue(epa_params
.called
)
2386 self
.assertDictEqual(result
, expected_result
)
2388 def test__ip_profile_to_ro_with_none(self
):
2391 result
= Ns
._ip
_profile
_to
_ro
(
2392 ip_profile
=ip_profile
,
2395 self
.assertIsNone(result
)
2397 def test__ip_profile_to_ro_with_empty_profile(self
):
2400 result
= Ns
._ip
_profile
_to
_ro
(
2401 ip_profile
=ip_profile
,
2404 self
.assertIsNone(result
)
2406 def test__ip_profile_to_ro_with_wrong_profile(self
):
2408 "no-profile": "here",
2411 "ip_version": "IPv4",
2412 "subnet_address": None,
2413 "gateway_address": None,
2414 "dhcp_enabled": False,
2415 "dhcp_start_address": None,
2419 result
= Ns
._ip
_profile
_to
_ro
(
2420 ip_profile
=ip_profile
,
2423 self
.assertDictEqual(expected_result
, result
)
2425 def test__ip_profile_to_ro_with_ipv4_profile(self
):
2427 "ip-version": "ipv4",
2428 "subnet-address": "192.168.0.0/24",
2429 "gateway-address": "192.168.0.254",
2432 "start-address": "192.168.0.10",
2437 "ip_version": "IPv4",
2438 "subnet_address": "192.168.0.0/24",
2439 "gateway_address": "192.168.0.254",
2440 "dhcp_enabled": True,
2441 "dhcp_start_address": "192.168.0.10",
2445 result
= Ns
._ip
_profile
_to
_ro
(
2446 ip_profile
=ip_profile
,
2449 self
.assertDictEqual(expected_result
, result
)
2451 def test__ip_profile_to_ro_with_ipv6_profile(self
):
2453 "ip-version": "ipv6",
2454 "subnet-address": "2001:0200:0001::/48",
2455 "gateway-address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2458 "start-address": "2001:0200:0001::0010",
2463 "ip_version": "IPv6",
2464 "subnet_address": "2001:0200:0001::/48",
2465 "gateway_address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2466 "dhcp_enabled": True,
2467 "dhcp_start_address": "2001:0200:0001::0010",
2471 result
= Ns
._ip
_profile
_to
_ro
(
2472 ip_profile
=ip_profile
,
2475 self
.assertDictEqual(expected_result
, result
)
2477 def test__ip_profile_to_ro_with_dns_server(self
):
2479 "ip-version": "ipv4",
2480 "subnet-address": "192.168.0.0/24",
2481 "gateway-address": "192.168.0.254",
2484 "start-address": "192.168.0.10",
2489 "address": "8.8.8.8",
2492 "address": "1.1.1.1",
2495 "address": "1.0.0.1",
2500 "ip_version": "IPv4",
2501 "subnet_address": "192.168.0.0/24",
2502 "gateway_address": "192.168.0.254",
2503 "dhcp_enabled": True,
2504 "dhcp_start_address": "192.168.0.10",
2506 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2509 result
= Ns
._ip
_profile
_to
_ro
(
2510 ip_profile
=ip_profile
,
2513 self
.assertDictEqual(expected_result
, result
)
2515 def test__ip_profile_to_ro_with_security_group(self
):
2517 "ip-version": "ipv4",
2518 "subnet-address": "192.168.0.0/24",
2519 "gateway-address": "192.168.0.254",
2522 "start-address": "192.168.0.10",
2526 "some-security-group": "here",
2530 "ip_version": "IPv4",
2531 "subnet_address": "192.168.0.0/24",
2532 "gateway_address": "192.168.0.254",
2533 "dhcp_enabled": True,
2534 "dhcp_start_address": "192.168.0.10",
2537 "some-security-group": "here",
2541 result
= Ns
._ip
_profile
_to
_ro
(
2542 ip_profile
=ip_profile
,
2545 self
.assertDictEqual(expected_result
, result
)
2547 def test__ip_profile_to_ro(self
):
2549 "ip-version": "ipv4",
2550 "subnet-address": "192.168.0.0/24",
2551 "gateway-address": "192.168.0.254",
2554 "start-address": "192.168.0.10",
2559 "address": "8.8.8.8",
2562 "address": "1.1.1.1",
2565 "address": "1.0.0.1",
2569 "some-security-group": "here",
2573 "ip_version": "IPv4",
2574 "subnet_address": "192.168.0.0/24",
2575 "gateway_address": "192.168.0.254",
2576 "dhcp_enabled": True,
2577 "dhcp_start_address": "192.168.0.10",
2579 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2581 "some-security-group": "here",
2585 result
= Ns
._ip
_profile
_to
_ro
(
2586 ip_profile
=ip_profile
,
2589 self
.assertDictEqual(expected_result
, result
)
2591 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2592 def test__process_net_params_with_empty_params(
2603 "provider_network": "some-profile-here",
2605 target_record_id
= ""
2608 "net_name": "ns-name-vld-name",
2609 "net_type": "bridge",
2611 "some_ip_profile": "here",
2613 "provider_network_profile": "some-profile-here",
2617 ip_profile_to_ro
.return_value
= {
2618 "some_ip_profile": "here",
2621 result
= Ns
._process
_net
_params
(
2622 target_vld
=target_vld
,
2625 target_record_id
=target_record_id
,
2628 self
.assertDictEqual(expected_result
, result
)
2629 self
.assertTrue(ip_profile_to_ro
.called
)
2631 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2632 def test__process_net_params_with_vim_info_sdn(
2644 "sdn-ports": ["some", "ports", "here"],
2645 "vlds": ["some", "vlds", "here"],
2648 target_record_id
= "vld.sdn.something"
2651 "sdn-ports": ["some", "ports", "here"],
2652 "vlds": ["some", "vlds", "here"],
2657 result
= Ns
._process
_net
_params
(
2658 target_vld
=target_vld
,
2661 target_record_id
=target_record_id
,
2664 self
.assertDictEqual(expected_result
, result
)
2665 self
.assertFalse(ip_profile_to_ro
.called
)
2667 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2668 def test__process_net_params_with_vim_info_sdn_target_vim(
2680 "sdn-ports": ["some", "ports", "here"],
2681 "vlds": ["some", "vlds", "here"],
2682 "target_vim": "some-vim",
2685 target_record_id
= "vld.sdn.something"
2687 "depends_on": ["some-vim vld.sdn"],
2689 "sdn-ports": ["some", "ports", "here"],
2690 "vlds": ["some", "vlds", "here"],
2691 "target_vim": "some-vim",
2696 result
= Ns
._process
_net
_params
(
2697 target_vld
=target_vld
,
2700 target_record_id
=target_record_id
,
2703 self
.assertDictEqual(expected_result
, result
)
2704 self
.assertFalse(ip_profile_to_ro
.called
)
2706 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2707 def test__process_net_params_with_vim_network_name(
2718 "vim_network_name": "some-network-name",
2720 target_record_id
= "vld.sdn.something"
2724 "name": "some-network-name",
2729 result
= Ns
._process
_net
_params
(
2730 target_vld
=target_vld
,
2733 target_record_id
=target_record_id
,
2736 self
.assertDictEqual(expected_result
, result
)
2737 self
.assertFalse(ip_profile_to_ro
.called
)
2739 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2740 def test__process_net_params_with_vim_network_id(
2751 "vim_network_id": "some-network-id",
2753 target_record_id
= "vld.sdn.something"
2757 "id": "some-network-id",
2762 result
= Ns
._process
_net
_params
(
2763 target_vld
=target_vld
,
2766 target_record_id
=target_record_id
,
2769 self
.assertDictEqual(expected_result
, result
)
2770 self
.assertFalse(ip_profile_to_ro
.called
)
2772 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2773 def test__process_net_params_with_mgmt_network(
2780 "mgmt-network": "some-mgmt-network",
2786 target_record_id
= "vld.sdn.something"
2794 result
= Ns
._process
_net
_params
(
2795 target_vld
=target_vld
,
2798 target_record_id
=target_record_id
,
2801 self
.assertDictEqual(expected_result
, result
)
2802 self
.assertFalse(ip_profile_to_ro
.called
)
2804 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2805 def test__process_net_params_with_underlay_eline(
2811 "underlay": "some-underlay-here",
2818 "provider_network": "some-profile-here",
2820 target_record_id
= ""
2824 "some_ip_profile": "here",
2826 "net_name": "ns-name-vld-name",
2828 "provider_network_profile": "some-profile-here",
2832 ip_profile_to_ro
.return_value
= {
2833 "some_ip_profile": "here",
2836 result
= Ns
._process
_net
_params
(
2837 target_vld
=target_vld
,
2840 target_record_id
=target_record_id
,
2843 self
.assertDictEqual(expected_result
, result
)
2844 self
.assertTrue(ip_profile_to_ro
.called
)
2846 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2847 def test__process_net_params_with_underlay_elan(
2853 "underlay": "some-underlay-here",
2860 "provider_network": "some-profile-here",
2862 target_record_id
= ""
2866 "some_ip_profile": "here",
2868 "net_name": "ns-name-vld-name",
2870 "provider_network_profile": "some-profile-here",
2874 ip_profile_to_ro
.return_value
= {
2875 "some_ip_profile": "here",
2878 result
= Ns
._process
_net
_params
(
2879 target_vld
=target_vld
,
2882 target_record_id
=target_record_id
,
2885 self
.assertDictEqual(expected_result
, result
)
2886 self
.assertTrue(ip_profile_to_ro
.called
)
2888 def test__get_cloud_init_exception(self
):
2889 db_mock
= MagicMock(name
="database mock")
2894 with self
.assertRaises(NsException
):
2895 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2897 def test__get_cloud_init_file_fs_exception(self
):
2898 db_mock
= MagicMock(name
="database mock")
2901 location
= "vnfr_id_123456:file:test_file"
2902 db_mock
.get_one
.return_value
= {
2905 "folder": "/home/osm",
2906 "pkg-dir": "vnfr_test_dir",
2911 with self
.assertRaises(NsException
):
2912 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2914 def test__get_cloud_init_file(self
):
2915 db_mock
= MagicMock(name
="database mock")
2916 fs_mock
= MagicMock(name
="filesystem mock")
2917 file_mock
= MagicMock(name
="file mock")
2919 location
= "vnfr_id_123456:file:test_file"
2920 cloud_init_content
= "this is a cloud init file content"
2922 db_mock
.get_one
.return_value
= {
2925 "folder": "/home/osm",
2926 "pkg-dir": "vnfr_test_dir",
2930 fs_mock
.file_open
.return_value
= file_mock
2931 file_mock
.__enter
__.return_value
.read
.return_value
= cloud_init_content
2933 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2935 self
.assertEqual(cloud_init_content
, result
)
2937 def test__get_cloud_init_vdu(self
):
2938 db_mock
= MagicMock(name
="database mock")
2941 location
= "vnfr_id_123456:vdu:0"
2942 cloud_init_content
= "this is a cloud init file content"
2944 db_mock
.get_one
.return_value
= {
2947 "cloud-init": cloud_init_content
,
2952 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2954 self
.assertEqual(cloud_init_content
, result
)
2956 @patch("jinja2.Environment.__init__")
2957 def test__parse_jinja2_undefined_error(self
, env_mock
: Mock
):
2958 cloud_init_content
= None
2962 env_mock
.side_effect
= UndefinedError("UndefinedError occurred.")
2964 with self
.assertRaises(NsException
):
2966 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2969 @patch("jinja2.Environment.__init__")
2970 def test__parse_jinja2_template_error(self
, env_mock
: Mock
):
2971 cloud_init_content
= None
2975 env_mock
.side_effect
= TemplateError("TemplateError occurred.")
2977 with self
.assertRaises(NsException
):
2979 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2982 @patch("jinja2.Environment.__init__")
2983 def test__parse_jinja2_template_not_found(self
, env_mock
: Mock
):
2984 cloud_init_content
= None
2988 env_mock
.side_effect
= TemplateNotFound("TemplateNotFound occurred.")
2990 with self
.assertRaises(NsException
):
2992 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2995 def test_rendering_jinja2_temp_without_special_characters(self
):
2996 cloud_init_content
= """
2999 table_type: {{type}}
3001 overwrite: {{is_override}}
3004 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3008 "is_override": "False",
3009 "command": "; mkdir abc",
3011 context
= "cloud-init for VM"
3012 expected_result
= """
3020 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
3022 result
= Ns
._parse
_jinja
2(
3023 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3025 self
.assertEqual(result
, expected_result
)
3027 def test_rendering_jinja2_temp_with_special_characters(self
):
3028 cloud_init_content
= """
3031 table_type: {{type}}
3033 overwrite: {{is_override}}
3036 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3040 "is_override": "False",
3041 "command": "& rm -rf",
3043 context
= "cloud-init for VM"
3044 expected_result
= """
3052 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3054 result
= Ns
._parse
_jinja
2(
3055 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3057 self
.assertNotEqual(result
, expected_result
)
3059 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self
):
3060 with
patch("osm_ng_ro.ns.Environment") as mock_environment
:
3061 mock_environment
.return_value
= Environment(
3062 undefined
=StrictUndefined
,
3063 autoescape
=select_autoescape(default_for_string
=False, default
=False),
3065 cloud_init_content
= """
3068 table_type: {{type}}
3070 overwrite: {{is_override}}
3073 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3077 "is_override": "False",
3078 "command": "& rm -rf /",
3080 context
= "cloud-init for VM"
3081 expected_result
= """
3089 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3091 result
= Ns
._parse
_jinja
2(
3092 cloud_init_content
=cloud_init_content
,
3096 self
.assertEqual(result
, expected_result
)
3098 @patch("osm_ng_ro.ns.Ns._assign_vim")
3099 def test__rebuild_start_stop_task(self
, assign_vim
):
3102 actions
= ["start", "stop", "rebuild"]
3103 vdu_id
= "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3104 vnf_id
= "665b4165-ce24-4320-bf19-b9a45bade49f"
3106 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3107 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3109 target_vim
= "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3110 t
= "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3111 for action
in actions
:
3113 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3114 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3115 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3116 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
3117 "status": "SCHEDULED",
3120 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0",
3121 "target_record_id": t
,
3123 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3127 extra_dict
["params"] = {
3128 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3131 task
= self
.ns
.rebuild_start_stop_task(
3141 self
.assertEqual(task
.get("action_id"), action_id
)
3142 self
.assertEqual(task
.get("nsr_id"), nsr_id
)
3143 self
.assertEqual(task
.get("target_id"), target_vim
)
3144 self
.assertDictEqual(task
, expected_result
)
3146 @patch("osm_ng_ro.ns.Ns._assign_vim")
3147 def test_verticalscale_task(self
, assign_vim
):
3151 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3152 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3154 target_record_id
= (
3155 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3156 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3160 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3161 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3162 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3163 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3164 "status": "SCHEDULED",
3166 "item": "verticalscale",
3167 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3168 "target_record_id": target_record_id
,
3170 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3171 "flavor_dict": "flavor_dict",
3175 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3177 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3180 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3181 extra_dict
["params"] = {
3182 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3183 "flavor_dict": "flavor_dict",
3185 task
= self
.ns
.verticalscale_task(
3186 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3189 self
.assertDictEqual(task
, expected_result
)
3191 @patch("osm_ng_ro.ns.Ns._assign_vim")
3192 def test_migrate_task(self
, assign_vim
):
3196 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3197 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3199 target_record_id
= (
3200 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3201 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3205 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3206 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3207 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3208 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3209 "status": "SCHEDULED",
3212 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3213 "target_record_id": target_record_id
,
3215 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3216 "migrate_host": "migrateToHost",
3220 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3222 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3225 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3226 extra_dict
["params"] = {
3227 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3228 "migrate_host": "migrateToHost",
3230 task
= self
.ns
.migrate_task(
3231 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3234 self
.assertDictEqual(task
, expected_result
)
3237 class TestProcessVduParams(unittest
.TestCase
):
3240 self
.logger
= CopyingMock(autospec
=True)
3242 def test_find_persistent_root_volumes_empty_instantiation_vol_list(self
):
3243 """Find persistent root volume, instantiation_vol_list is empty."""
3244 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3245 target_vdu
= target_vdu_wth_persistent_storage
3246 vdu_instantiation_volumes_list
= []
3248 expected_persist_root_disk
= {
3249 "persistent-root-volume": {
3250 "image_id": "ubuntu20.04",
3254 expected_disk_list
= [
3256 "image_id": "ubuntu20.04",
3260 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3261 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3263 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3264 self
.assertEqual(disk_list
, expected_disk_list
)
3265 self
.assertEqual(len(disk_list
), 1)
3267 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(self
):
3268 """Find persistent root volume, always selects the first vsd as root volume."""
3269 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3270 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3271 "persistent-volume2",
3272 "persistent-root-volume",
3275 target_vdu
= target_vdu_wth_persistent_storage
3276 vdu_instantiation_volumes_list
= []
3278 expected_persist_root_disk
= {
3279 "persistent-volume2": {
3280 "image_id": "ubuntu20.04",
3284 expected_disk_list
= [
3286 "image_id": "ubuntu20.04",
3290 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3291 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3293 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3294 self
.assertEqual(disk_list
, expected_disk_list
)
3295 self
.assertEqual(len(disk_list
), 1)
3297 def test_find_persistent_root_volumes_empty_size_of_storage(self
):
3298 """Find persistent root volume, size of storage is empty."""
3299 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3300 vnfd
["virtual-storage-desc"][0]["size-of-storage"] = ""
3301 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3302 "persistent-volume2",
3303 "persistent-root-volume",
3306 target_vdu
= target_vdu_wth_persistent_storage
3307 vdu_instantiation_volumes_list
= []
3309 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3310 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3312 self
.assertEqual(persist_root_disk
, None)
3313 self
.assertEqual(disk_list
, [])
3315 def test_find_persistent_root_empty_disk_list(self
):
3316 """Find persistent root volume, empty disk list."""
3317 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3318 target_vdu
= target_vdu_wth_persistent_storage
3319 vdu_instantiation_volumes_list
= []
3321 expected_persist_root_disk
= {
3322 "persistent-root-volume": {
3323 "image_id": "ubuntu20.04",
3327 expected_disk_list
= [
3329 "image_id": "ubuntu20.04",
3333 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3334 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3336 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3337 self
.assertEqual(disk_list
, expected_disk_list
)
3338 self
.assertEqual(len(disk_list
), 1)
3340 def test_find_persistent_root_volumes_target_vdu_mismatch(self
):
3341 """Find persistent root volume, target vdu name is not matching."""
3342 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3343 vnfd
["vdu"][0]["name"] = "Several_Volumes-VM"
3344 target_vdu
= target_vdu_wth_persistent_storage
3345 vdu_instantiation_volumes_list
= []
3347 result
= self
.ns
.find_persistent_root_volumes(
3348 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3350 self
.assertEqual(result
, None)
3351 self
.assertEqual(disk_list
, [])
3352 self
.assertEqual(len(disk_list
), 0)
3354 def test_find_persistent_root_volumes_with_instantiation_vol_list(self
):
3355 """Find persistent root volume, existing volume needs to be used."""
3356 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3357 target_vdu
= target_vdu_wth_persistent_storage
3358 vdu_instantiation_volumes_list
= [
3360 "vim-volume-id": vim_volume_id
,
3361 "name": "persistent-root-volume",
3365 expected_persist_root_disk
= {
3366 "persistent-root-volume": {
3367 "vim_volume_id": vim_volume_id
,
3368 "image_id": "ubuntu20.04",
3371 expected_disk_list
= [
3373 "vim_volume_id": vim_volume_id
,
3374 "image_id": "ubuntu20.04",
3377 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3378 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3380 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3381 self
.assertEqual(disk_list
, expected_disk_list
)
3382 self
.assertEqual(len(disk_list
), 1)
3384 def test_find_persistent_root_volumes_invalid_instantiation_params(self
):
3385 """Find persistent root volume, existing volume id keyword is invalid."""
3386 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3387 target_vdu
= target_vdu_wth_persistent_storage
3388 vdu_instantiation_volumes_list
= [
3390 "volume-id": vim_volume_id
,
3391 "name": "persistent-root-volume",
3395 with self
.assertRaises(KeyError):
3396 self
.ns
.find_persistent_root_volumes(
3397 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3400 self
.assertEqual(disk_list
, [])
3401 self
.assertEqual(len(disk_list
), 0)
3403 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3406 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3407 persistent_root_disk
= {
3408 "persistent-root-volume": {
3409 "image_id": "ubuntu20.04",
3414 target_vdu
= target_vdu_wth_persistent_storage
3415 vdu_instantiation_volumes_list
= []
3418 "image_id": "ubuntu20.04",
3423 expected_disk_list
= [
3425 "image_id": "ubuntu20.04",
3432 self
.ns
.find_persistent_volumes(
3433 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3435 self
.assertEqual(disk_list
, expected_disk_list
)
3437 def test_find_persistent_volumes_vdu_wth_inst_vol_list(self
):
3438 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3439 persistent_root_disk
= {
3440 "persistent-root-volume": {
3441 "image_id": "ubuntu20.04",
3445 vdu_instantiation_volumes_list
= [
3447 "vim-volume-id": vim_volume_id
,
3448 "name": "persistent-volume2",
3451 target_vdu
= target_vdu_wth_persistent_storage
3454 "image_id": "ubuntu20.04",
3458 expected_disk_list
= [
3460 "image_id": "ubuntu20.04",
3464 "vim_volume_id": vim_volume_id
,
3467 self
.ns
.find_persistent_volumes(
3468 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3470 self
.assertEqual(disk_list
, expected_disk_list
)
3472 def test_find_persistent_volumes_vdu_wthout_persistent_storage(self
):
3473 """Find persistent ordinary volume, there is not any persistent disk."""
3474 persistent_root_disk
= {}
3475 vdu_instantiation_volumes_list
= []
3476 target_vdu
= target_vdu_wthout_persistent_storage
3478 self
.ns
.find_persistent_volumes(
3479 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3481 self
.assertEqual(disk_list
, disk_list
)
3483 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3486 """There is persistent root disk, but there is not ordinary persistent disk."""
3487 persistent_root_disk
= {
3488 "persistent-root-volume": {
3489 "image_id": "ubuntu20.04",
3493 vdu_instantiation_volumes_list
= []
3494 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3495 target_vdu
["virtual-storages"] = [
3497 "id": "persistent-root-volume",
3498 "size-of-storage": "10",
3499 "type-of-storage": "persistent-storage:persistent-storage",
3502 "id": "ephemeral-volume",
3503 "size-of-storage": "1",
3504 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3509 "image_id": "ubuntu20.04",
3513 self
.ns
.find_persistent_volumes(
3514 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3516 self
.assertEqual(disk_list
, disk_list
)
3518 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(self
):
3519 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3520 vim-volume-id is given as instantiation parameter but disk id is not matching."""
3521 vdu_instantiation_volumes_list
= [
3523 "vim-volume-id": vim_volume_id
,
3524 "name": "persistent-volume3",
3527 persistent_root_disk
= {
3528 "persistent-root-volume": {
3529 "image_id": "ubuntu20.04",
3535 "image_id": "ubuntu20.04",
3539 expected_disk_list
= [
3541 "image_id": "ubuntu20.04",
3549 target_vdu
= target_vdu_wth_persistent_storage
3550 self
.ns
.find_persistent_volumes(
3551 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3553 self
.assertEqual(disk_list
, expected_disk_list
)
3555 def test_sort_vdu_interfaces_position_all_wth_positions(self
):
3556 """Interfaces are sorted according to position, all have positions."""
3557 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3558 target_vdu
["interfaces"] = [
3561 "ns-vld-id": "datanet",
3566 "ns-vld-id": "mgmtnet",
3570 sorted_interfaces
= [
3573 "ns-vld-id": "mgmtnet",
3578 "ns-vld-id": "datanet",
3582 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3583 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3585 def test_sort_vdu_interfaces_position_some_wth_position(self
):
3586 """Interfaces are sorted according to position, some of them have positions."""
3587 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3588 target_vdu
["interfaces"] = [
3591 "ns-vld-id": "mgmtnet",
3595 "ns-vld-id": "datanet",
3599 sorted_interfaces
= [
3602 "ns-vld-id": "datanet",
3607 "ns-vld-id": "mgmtnet",
3610 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3611 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3613 def test_sort_vdu_interfaces_position_empty_interface_list(self
):
3614 """Interface list is empty."""
3615 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3616 target_vdu
["interfaces"] = []
3617 sorted_interfaces
= []
3618 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3619 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3621 def test_partially_locate_vdu_interfaces(self
):
3622 """Some interfaces have positions."""
3623 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3624 target_vdu
["interfaces"] = [
3627 "ns-vld-id": "net1",
3629 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3632 "ns-vld-id": "mgmtnet",
3636 "ns-vld-id": "datanet",
3640 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3641 self
.assertDictEqual(
3642 target_vdu
["interfaces"][0],
3645 "ns-vld-id": "datanet",
3649 self
.assertDictEqual(
3650 target_vdu
["interfaces"][2],
3651 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3654 def test_partially_locate_vdu_interfaces_position_start_from_0(self
):
3655 """Some interfaces have positions, position start from 0."""
3656 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3657 target_vdu
["interfaces"] = [
3660 "ns-vld-id": "net1",
3662 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3665 "ns-vld-id": "mgmtnet",
3669 "ns-vld-id": "datanet",
3673 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3674 self
.assertDictEqual(
3675 target_vdu
["interfaces"][0],
3678 "ns-vld-id": "datanet",
3682 self
.assertDictEqual(
3683 target_vdu
["interfaces"][3],
3684 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3687 def test_partially_locate_vdu_interfaces_wthout_position(self
):
3688 """Interfaces do not have positions."""
3689 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3690 target_vdu
["interfaces"] = interfaces_wthout_positions
3691 expected_result
= deepcopy(target_vdu
["interfaces"])
3692 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3693 self
.assertEqual(target_vdu
["interfaces"], expected_result
)
3695 def test_partially_locate_vdu_interfaces_all_has_position(self
):
3696 """All interfaces have position."""
3697 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3698 target_vdu
["interfaces"] = interfaces_wth_all_positions
3699 expected_interfaces
= [
3702 "ns-vld-id": "net2",
3707 "ns-vld-id": "mgmtnet",
3712 "ns-vld-id": "net1",
3716 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3717 self
.assertEqual(target_vdu
["interfaces"], expected_interfaces
)
3719 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3720 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3721 def test_prepare_vdu_cloud_init(self
, mock_parse_jinja2
, mock_get_cloud_init
):
3722 """Target_vdu has cloud-init and boot-data-drive."""
3723 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3724 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3725 target_vdu
["boot-data-drive"] = "vda"
3727 mock_get_cloud_init
.return_value
= cloud_init_content
3728 mock_parse_jinja2
.return_value
= user_data
3730 "user-data": user_data
,
3731 "boot-data-drive": "vda",
3733 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3734 self
.assertDictEqual(result
, expected_result
)
3735 mock_get_cloud_init
.assert_called_once_with(
3736 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3738 mock_parse_jinja2
.assert_called_once_with(
3739 cloud_init_content
=cloud_init_content
,
3741 context
="sample-cloud-init-path",
3744 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3745 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3746 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3747 self
, mock_parse_jinja2
, mock_get_cloud_init
3749 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3750 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3751 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3752 target_vdu
["boot-data-drive"] = "vda"
3754 mock_get_cloud_init
.side_effect
= NsException(
3755 "Mismatch descriptor for cloud init."
3758 with self
.assertRaises(NsException
) as err
:
3759 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3760 self
.assertEqual(str(err
.exception
), "Mismatch descriptor for cloud init.")
3762 mock_get_cloud_init
.assert_called_once_with(
3763 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3765 mock_parse_jinja2
.assert_not_called()
3767 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3768 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3769 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3770 self
, mock_parse_jinja2
, mock_get_cloud_init
3772 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3773 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3774 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3775 target_vdu
["boot-data-drive"] = "vda"
3777 mock_get_cloud_init
.return_value
= cloud_init_content
3778 mock_parse_jinja2
.side_effect
= NsException("Error parsing cloud-init content.")
3780 with self
.assertRaises(NsException
) as err
:
3781 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3782 self
.assertEqual(str(err
.exception
), "Error parsing cloud-init content.")
3783 mock_get_cloud_init
.assert_called_once_with(
3784 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3786 mock_parse_jinja2
.assert_called_once_with(
3787 cloud_init_content
=cloud_init_content
,
3789 context
="sample-cloud-init-path",
3792 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3793 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3794 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
3795 self
, mock_parse_jinja2
, mock_get_cloud_init
3797 """Target_vdu has cloud-init but do not have boot-data-drive."""
3798 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3799 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3801 mock_get_cloud_init
.return_value
= cloud_init_content
3802 mock_parse_jinja2
.return_value
= user_data
3804 "user-data": user_data
,
3806 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3807 self
.assertDictEqual(result
, expected_result
)
3808 mock_get_cloud_init
.assert_called_once_with(
3809 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3811 mock_parse_jinja2
.assert_called_once_with(
3812 cloud_init_content
=cloud_init_content
,
3814 context
="sample-cloud-init-path",
3817 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3818 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3819 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
3820 self
, mock_parse_jinja2
, mock_get_cloud_init
3822 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
3823 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3824 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3825 target_vdu
["boot-data-drive"] = "vda"
3826 vdu2cloud_init
= {"sample-cloud-init-path": cloud_init_content
}
3827 mock_parse_jinja2
.return_value
= user_data
3829 "user-data": user_data
,
3830 "boot-data-drive": "vda",
3832 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3833 self
.assertDictEqual(result
, expected_result
)
3834 mock_get_cloud_init
.assert_not_called()
3835 mock_parse_jinja2
.assert_called_once_with(
3836 cloud_init_content
=cloud_init_content
,
3838 context
="sample-cloud-init-path",
3841 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3842 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3843 def test_prepare_vdu_cloud_init_no_cloud_init(
3844 self
, mock_parse_jinja2
, mock_get_cloud_init
3846 """Target_vdu do not have cloud-init."""
3847 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3848 target_vdu
["boot-data-drive"] = "vda"
3851 "boot-data-drive": "vda",
3853 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3854 self
.assertDictEqual(result
, expected_result
)
3855 mock_get_cloud_init
.assert_not_called()
3856 mock_parse_jinja2
.assert_not_called()
3858 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self
):
3859 """ns_vld and vnf_vld both exist."""
3862 "ns-vld-id": "mgmtnet",
3863 "vnf-vld-id": "mgmt_cp_int",
3865 expected_result
= f
"{ns_preffix}:vld.mgmtnet"
3866 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3867 interface
, ns_preffix
, vnf_preffix
3869 self
.assertEqual(result
, expected_result
)
3871 def test_check_vld_information_of_interfaces_empty_interfaces(self
):
3872 """Interface dict is empty."""
3874 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3875 interface
, ns_preffix
, vnf_preffix
3877 self
.assertEqual(result
, "")
3879 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self
):
3880 """Interface dict has only vnf_vld."""
3883 "vnf-vld-id": "mgmt_cp_int",
3885 expected_result
= f
"{vnf_preffix}:vld.mgmt_cp_int"
3886 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3887 interface
, ns_preffix
, vnf_preffix
3889 self
.assertEqual(result
, expected_result
)
3891 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
3894 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
3897 "vnf-vld-id": "mgmt_cp_int",
3900 with self
.assertRaises(Exception) as err
:
3901 self
.ns
._check
_vld
_information
_of
_interfaces
(
3902 interface
, ns_preffix
, vnf_preffix
3904 self
.assertEqual(type(err
), TypeError)
3906 def test_prepare_interface_port_security_has_security_details(self
):
3907 """Interface dict has port security details."""
3910 "ns-vld-id": "mgmtnet",
3911 "vnf-vld-id": "mgmt_cp_int",
3912 "port-security-enabled": True,
3913 "port-security-disable-strategy": "allow-address-pairs",
3915 expected_interface
= {
3917 "ns-vld-id": "mgmtnet",
3918 "vnf-vld-id": "mgmt_cp_int",
3919 "port_security": True,
3920 "port_security_disable_strategy": "allow-address-pairs",
3922 self
.ns
._prepare
_interface
_port
_security
(interface
)
3923 self
.assertDictEqual(interface
, expected_interface
)
3925 def test_prepare_interface_port_security_empty_interfaces(self
):
3926 """Interface dict is empty."""
3928 expected_interface
= {}
3929 self
.ns
._prepare
_interface
_port
_security
(interface
)
3930 self
.assertDictEqual(interface
, expected_interface
)
3932 def test_prepare_interface_port_security_wthout_port_security(self
):
3933 """Interface dict does not have port security details."""
3936 "ns-vld-id": "mgmtnet",
3937 "vnf-vld-id": "mgmt_cp_int",
3939 expected_interface
= {
3941 "ns-vld-id": "mgmtnet",
3942 "vnf-vld-id": "mgmt_cp_int",
3944 self
.ns
._prepare
_interface
_port
_security
(interface
)
3945 self
.assertDictEqual(interface
, expected_interface
)
3947 def test_create_net_item_of_interface_floating_ip_port_security(self
):
3948 """Interface dict has floating ip, port-security details."""
3951 "vcpi": "sample_vcpi",
3952 "port_security": True,
3953 "port_security_disable_strategy": "allow-address-pairs",
3954 "floating_ip": "10.1.1.12",
3955 "ns-vld-id": "mgmtnet",
3956 "vnf-vld-id": "mgmt_cp_int",
3958 net_text
= f
"{ns_preffix}"
3959 expected_net_item
= {
3961 "port_security": True,
3962 "port_security_disable_strategy": "allow-address-pairs",
3963 "floating_ip": "10.1.1.12",
3964 "net_id": f
"TASK-{ns_preffix}",
3967 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
3968 self
.assertDictEqual(result
, expected_net_item
)
3970 def test_create_net_item_of_interface_invalid_net_text(self
):
3971 """net-text is invalid."""
3974 "vcpi": "sample_vcpi",
3975 "port_security": True,
3976 "port_security_disable_strategy": "allow-address-pairs",
3977 "floating_ip": "10.1.1.12",
3978 "ns-vld-id": "mgmtnet",
3979 "vnf-vld-id": "mgmt_cp_int",
3982 with self
.assertRaises(TypeError):
3983 self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
3985 def test_create_net_item_of_interface_empty_interface(self
):
3986 """Interface dict is empty."""
3988 net_text
= ns_preffix
3989 expected_net_item
= {
3990 "net_id": f
"TASK-{ns_preffix}",
3993 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
3994 self
.assertDictEqual(result
, expected_net_item
)
3996 @patch("osm_ng_ro.ns.deep_get")
3997 def test_prepare_type_of_interface_type_sriov(self
, mock_deep_get
):
3998 """Interface type is SR-IOV."""
4001 "vcpi": "sample_vcpi",
4002 "port_security": True,
4003 "port_security_disable_strategy": "allow-address-pairs",
4004 "floating_ip": "10.1.1.12",
4005 "ns-vld-id": "mgmtnet",
4006 "vnf-vld-id": "mgmt_cp_int",
4009 mock_deep_get
.return_value
= "SR-IOV"
4010 net_text
= ns_preffix
4012 expected_net_item
= {
4017 self
.ns
._prepare
_type
_of
_interface
(
4018 interface
, tasks_by_target_record_id
, net_text
, net_item
4020 self
.assertDictEqual(net_item
, expected_net_item
)
4023 tasks_by_target_record_id
[net_text
]["extra_dict"]["params"]["net_type"],
4025 mock_deep_get
.assert_called_once_with(
4026 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4029 @patch("osm_ng_ro.ns.deep_get")
4030 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
4033 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
4036 "vcpi": "sample_vcpi",
4037 "port_security": True,
4038 "port_security_disable_strategy": "allow-address-pairs",
4039 "floating_ip": "10.1.1.12",
4040 "ns-vld-id": "mgmtnet",
4041 "vnf-vld-id": "mgmt_cp_int",
4042 "type": "PCI-PASSTHROUGH",
4044 mock_deep_get
.return_value
= {}
4045 tasks_by_target_record_id
= {}
4046 net_text
= ns_preffix
4048 expected_net_item
= {
4050 "model": "PCI-PASSTHROUGH",
4051 "type": "PCI-PASSTHROUGH",
4053 self
.ns
._prepare
_type
_of
_interface
(
4054 interface
, tasks_by_target_record_id
, net_text
, net_item
4056 self
.assertDictEqual(net_item
, expected_net_item
)
4057 mock_deep_get
.assert_called_once_with(
4058 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4061 @patch("osm_ng_ro.ns.deep_get")
4062 def test_prepare_type_of_interface_type_mgmt(self
, mock_deep_get
):
4063 """Interface type is mgmt."""
4066 "vcpi": "sample_vcpi",
4067 "port_security": True,
4068 "port_security_disable_strategy": "allow-address-pairs",
4069 "floating_ip": "10.1.1.12",
4070 "ns-vld-id": "mgmtnet",
4071 "vnf-vld-id": "mgmt_cp_int",
4074 tasks_by_target_record_id
= {}
4075 net_text
= ns_preffix
4077 expected_net_item
= {
4080 self
.ns
._prepare
_type
_of
_interface
(
4081 interface
, tasks_by_target_record_id
, net_text
, net_item
4083 self
.assertDictEqual(net_item
, expected_net_item
)
4084 mock_deep_get
.assert_not_called()
4086 @patch("osm_ng_ro.ns.deep_get")
4087 def test_prepare_type_of_interface_type_bridge(self
, mock_deep_get
):
4088 """Interface type is bridge."""
4091 "vcpi": "sample_vcpi",
4092 "port_security": True,
4093 "port_security_disable_strategy": "allow-address-pairs",
4094 "floating_ip": "10.1.1.12",
4095 "ns-vld-id": "mgmtnet",
4096 "vnf-vld-id": "mgmt_cp_int",
4098 tasks_by_target_record_id
= {}
4099 net_text
= ns_preffix
4101 expected_net_item
= {
4105 self
.ns
._prepare
_type
_of
_interface
(
4106 interface
, tasks_by_target_record_id
, net_text
, net_item
4108 self
.assertDictEqual(net_item
, expected_net_item
)
4109 mock_deep_get
.assert_not_called()
4111 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4112 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4113 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4114 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4115 def test_prepare_vdu_interfaces(
4117 mock_type_of_interface
,
4118 mock_item_of_interface
,
4120 mock_vld_information_of_interface
,
4122 """Prepare vdu interfaces successfully."""
4123 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4126 "ns-vld-id": "net1",
4127 "ip-address": "13.2.12.31",
4128 "mgmt-interface": True,
4132 "vnf-vld-id": "net2",
4133 "mac-address": "d0:94:66:ed:fc:e2",
4137 "ns-vld-id": "mgmtnet",
4139 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4141 "params": "test_params",
4142 "find_params": "test_find_params",
4146 net_text_1
= f
"{ns_preffix}:net1"
4147 net_text_2
= f
"{vnf_preffix}:net2"
4148 net_text_3
= f
"{ns_preffix}:mgmtnet"
4151 "net_id": f
"TASK-{ns_preffix}",
4156 "net_id": f
"TASK-{ns_preffix}",
4161 "net_id": f
"TASK-{ns_preffix}",
4164 mock_item_of_interface
.side_effect
= [net_item_1
, net_item_2
, net_item_3
]
4165 mock_vld_information_of_interface
.side_effect
= [
4171 expected_extra_dict
= {
4172 "params": "test_params",
4173 "find_params": "test_find_params",
4174 "depends_on": [net_text_1
, net_text_2
, net_text_3
],
4175 "mgmt_vdu_interface": 0,
4177 updated_net_item1
= deepcopy(net_item_1
)
4178 updated_net_item1
.update({"ip_address": "13.2.12.31"})
4179 updated_net_item2
= deepcopy(net_item_2
)
4180 updated_net_item2
.update({"mac_address": "d0:94:66:ed:fc:e2"})
4181 expected_net_list
= [updated_net_item1
, updated_net_item2
, net_item_3
]
4182 self
.ns
._prepare
_vdu
_interfaces
(
4188 tasks_by_target_record_id
,
4191 _call_mock_vld_information_of_interface
= (
4192 mock_vld_information_of_interface
.call_args_list
4195 _call_mock_vld_information_of_interface
[0][0],
4196 (interface_1
, ns_preffix
, vnf_preffix
),
4199 _call_mock_vld_information_of_interface
[1][0],
4200 (interface_2
, ns_preffix
, vnf_preffix
),
4203 _call_mock_vld_information_of_interface
[2][0],
4204 (interface_3
, ns_preffix
, vnf_preffix
),
4207 _call_mock_port_security
= mock_port_security
.call_args_list
4208 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4209 self
.assertEqual(_call_mock_port_security
[1].args
[0], interface_2
)
4210 self
.assertEqual(_call_mock_port_security
[2].args
[0], interface_3
)
4212 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4213 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4214 self
.assertEqual(_call_mock_item_of_interface
[1][0], (interface_2
, net_text_2
))
4215 self
.assertEqual(_call_mock_item_of_interface
[2][0], (interface_3
, net_text_3
))
4217 _call_mock_type_of_interface
= mock_type_of_interface
.call_args_list
4219 _call_mock_type_of_interface
[0][0],
4220 (interface_1
, tasks_by_target_record_id
, net_text_1
, net_item_1
),
4223 _call_mock_type_of_interface
[1][0],
4224 (interface_2
, tasks_by_target_record_id
, net_text_2
, net_item_2
),
4227 _call_mock_type_of_interface
[2][0],
4228 (interface_3
, tasks_by_target_record_id
, net_text_3
, net_item_3
),
4230 self
.assertEqual(net_list
, expected_net_list
)
4231 self
.assertEqual(extra_dict
, expected_extra_dict
)
4232 self
.logger
.error
.assert_not_called()
4234 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4235 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4236 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4237 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4238 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4240 mock_type_of_interface
,
4241 mock_item_of_interface
,
4243 mock_vld_information_of_interface
,
4245 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4246 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4249 "ns-vld-id": "net1",
4250 "ip-address": "13.2.12.31",
4251 "mgmt-interface": True,
4255 "vnf-vld-id": "net2",
4256 "mac-address": "d0:94:66:ed:fc:e2",
4260 "ns-vld-id": "mgmtnet",
4262 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4264 "params": "test_params",
4265 "find_params": "test_find_params",
4268 net_text_1
= f
"{ns_preffix}:net1"
4269 mock_item_of_interface
.side_effect
= [TypeError, TypeError, TypeError]
4271 mock_vld_information_of_interface
.side_effect
= [net_text_1
]
4273 expected_extra_dict
= {
4274 "params": "test_params",
4275 "find_params": "test_find_params",
4276 "depends_on": [net_text_1
],
4278 with self
.assertRaises(TypeError):
4279 self
.ns
._prepare
_vdu
_interfaces
(
4285 tasks_by_target_record_id
,
4289 _call_mock_vld_information_of_interface
= (
4290 mock_vld_information_of_interface
.call_args_list
4293 _call_mock_vld_information_of_interface
[0][0],
4294 (interface_1
, ns_preffix
, vnf_preffix
),
4297 _call_mock_port_security
= mock_port_security
.call_args_list
4298 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4300 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4301 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4303 mock_type_of_interface
.assert_not_called()
4304 self
.logger
.error
.assert_not_called()
4305 self
.assertEqual(net_list
, [])
4306 self
.assertEqual(extra_dict
, expected_extra_dict
)
4308 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4309 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4310 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4311 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4312 def test_prepare_vdu_interfaces_vld_information_is_empty(
4314 mock_type_of_interface
,
4315 mock_item_of_interface
,
4317 mock_vld_information_of_interface
,
4319 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4320 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4323 "ns-vld-id": "net1",
4324 "ip-address": "13.2.12.31",
4325 "mgmt-interface": True,
4329 "vnf-vld-id": "net2",
4330 "mac-address": "d0:94:66:ed:fc:e2",
4334 "ns-vld-id": "mgmtnet",
4336 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4338 "params": "test_params",
4339 "find_params": "test_find_params",
4342 mock_vld_information_of_interface
.side_effect
= ["", "", ""]
4344 self
.ns
._prepare
_vdu
_interfaces
(
4350 tasks_by_target_record_id
,
4354 _call_mock_vld_information_of_interface
= (
4355 mock_vld_information_of_interface
.call_args_list
4358 _call_mock_vld_information_of_interface
[0][0],
4359 (interface_1
, ns_preffix
, vnf_preffix
),
4362 _call_mock_vld_information_of_interface
[1][0],
4363 (interface_2
, ns_preffix
, vnf_preffix
),
4366 _call_mock_vld_information_of_interface
[2][0],
4367 (interface_3
, ns_preffix
, vnf_preffix
),
4370 _call_logger
= self
.logger
.error
.call_args_list
4373 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4377 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4381 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4383 self
.assertEqual(net_list
, [])
4387 "params": "test_params",
4388 "find_params": "test_find_params",
4393 mock_item_of_interface
.assert_not_called()
4394 mock_port_security
.assert_not_called()
4395 mock_type_of_interface
.assert_not_called()
4397 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4398 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4399 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4400 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4401 def test_prepare_vdu_interfaces_empty_interface_list(
4403 mock_type_of_interface
,
4404 mock_item_of_interface
,
4406 mock_vld_information_of_interface
,
4408 """Prepare vdu interfaces, interface list is empty."""
4409 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4410 target_vdu
["interfaces"] = []
4413 self
.ns
._prepare
_vdu
_interfaces
(
4419 tasks_by_target_record_id
,
4422 mock_type_of_interface
.assert_not_called()
4423 mock_vld_information_of_interface
.assert_not_called()
4424 mock_item_of_interface
.assert_not_called()
4425 mock_port_security
.assert_not_called()
4427 def test_prepare_vdu_ssh_keys(self
):
4428 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4429 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4430 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4431 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4432 target_vdu
["ssh-access-required"] = True
4434 expected_cloud_config
= {
4435 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4437 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4438 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4440 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self
):
4441 """Target_vdu does not have ssh-keys."""
4442 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4443 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4444 target_vdu
["ssh-access-required"] = True
4446 expected_cloud_config
= {"key-pairs": [{"public_key": "path_of_public_key"}]}
4447 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4448 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4450 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self
):
4451 """Target_vdu has ssh-keys, ssh-access is not required."""
4452 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4453 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4454 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4455 target_vdu
["ssh-access-required"] = False
4457 expected_cloud_config
= {"key-pairs": ["sample-ssh-key"]}
4458 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4459 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4461 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4462 def test_add_persistent_root_disk_to_disk_list(
4463 self
, mock_select_persistent_root_disk
4465 """Add persistent root disk to disk_list"""
4467 "id": "persistent-root-volume",
4468 "type-of-storage": "persistent-storage:persistent-storage",
4469 "size-of-storage": "10",
4471 mock_select_persistent_root_disk
.return_value
= root_disk
4472 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4473 vnfd
["virtual-storage-desc"][1] = root_disk
4474 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4475 persistent_root_disk
= {}
4477 expected_disk_list
= [
4479 "image_id": "ubuntu20.04",
4483 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4484 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4486 self
.assertEqual(disk_list
, expected_disk_list
)
4487 mock_select_persistent_root_disk
.assert_called_once()
4489 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4490 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4491 self
, mock_select_persistent_root_disk
4493 """Add persistent root disk to disk_list"""
4495 "id": "persistent-root-volume",
4496 "type-of-storage": "persistent-storage:persistent-storage",
4497 "size-of-storage": "10",
4499 mock_select_persistent_root_disk
.side_effect
= AttributeError
4500 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4501 vnfd
["virtual-storage-desc"][1] = root_disk
4502 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4503 persistent_root_disk
= {}
4505 with self
.assertRaises(AttributeError):
4506 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4507 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4509 self
.assertEqual(disk_list
, [])
4510 mock_select_persistent_root_disk
.assert_called_once()
4512 def test_add_persistent_ordinary_disk_to_disk_list(self
):
4513 """Add persistent ordinary disk to disk_list"""
4514 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4515 persistent_root_disk
= {
4516 "persistent-root-volume": {
4517 "image_id": "ubuntu20.04",
4521 persistent_ordinary_disk
= {}
4523 expected_disk_list
= [
4528 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4529 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4531 self
.assertEqual(disk_list
, expected_disk_list
)
4533 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(self
):
4534 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4535 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4536 persistent_root_disk
= {
4537 "persistent-root-volume": {
4538 "image_id": "ubuntu20.04",
4541 "persistent-volume2": {
4545 persistent_ordinary_disk
= {}
4548 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4549 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4551 self
.assertEqual(disk_list
, [])
4553 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4554 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4555 self
, mock_select_persistent_root_disk
4557 """VNFD does not have persistent storage."""
4558 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4559 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4560 mock_select_persistent_root_disk
.return_value
= None
4561 persistent_root_disk
= {}
4563 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4564 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4566 self
.assertEqual(disk_list
, [])
4567 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4569 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4570 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4571 self
, mock_select_persistent_root_disk
4573 """Persistent_root_disk dict is empty."""
4574 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4575 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4576 mock_select_persistent_root_disk
.return_value
= None
4577 persistent_root_disk
= {}
4579 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4580 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4582 self
.assertEqual(disk_list
, [])
4583 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4585 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self
):
4586 """Invalid extra dict."""
4587 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4588 target_vdu
["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4590 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4591 with self
.assertRaises(NsException
) as err
:
4592 self
.ns
._prepare
_vdu
_affinity
_group
_list
(target_vdu
, extra_dict
, ns_preffix
)
4593 self
.assertEqual(str(err
.exception
), "Invalid extra_dict format.")
4595 def test_prepare_vdu_affinity_group_list_one_affinity_group(self
):
4596 """There is one affinity-group."""
4597 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4598 target_vdu
["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4599 extra_dict
= {"depends_on": []}
4600 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4601 affinity_group_txt
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4602 expected_result
= [{"affinity_group_id": "TASK-" + affinity_group_txt
}]
4603 expected_extra_dict
= {"depends_on": [affinity_group_txt
]}
4604 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4605 target_vdu
, extra_dict
, ns_preffix
4607 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4608 self
.assertEqual(result
, expected_result
)
4610 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self
):
4611 """There are two affinity-groups."""
4612 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4613 target_vdu
["affinity-or-anti-affinity-group-id"] = [
4614 "affinity-group-id1",
4615 "affinity-group-id2",
4617 extra_dict
= {"depends_on": []}
4618 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4619 affinity_group_txt1
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4620 affinity_group_txt2
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4622 {"affinity_group_id": "TASK-" + affinity_group_txt1
},
4623 {"affinity_group_id": "TASK-" + affinity_group_txt2
},
4625 expected_extra_dict
= {"depends_on": [affinity_group_txt1
, affinity_group_txt2
]}
4626 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4627 target_vdu
, extra_dict
, ns_preffix
4629 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4630 self
.assertEqual(result
, expected_result
)
4632 def test_prepare_vdu_affinity_group_list_no_affinity_group(self
):
4633 """There is not any affinity-group."""
4634 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4635 extra_dict
= {"depends_on": []}
4636 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4637 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4638 target_vdu
, extra_dict
, ns_preffix
4640 self
.assertDictEqual(extra_dict
, {"depends_on": []})
4641 self
.assertEqual(result
, [])
4643 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4644 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4645 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4646 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4647 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4648 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4649 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4650 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4651 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4652 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4653 def test_process_vdu_params_with_inst_vol_list(
4655 mock_prepare_vdu_affinity_group_list
,
4656 mock_add_persistent_ordinary_disks_to_disk_list
,
4657 mock_add_persistent_root_disk_to_disk_list
,
4658 mock_find_persistent_volumes
,
4659 mock_find_persistent_root_volumes
,
4660 mock_prepare_vdu_ssh_keys
,
4661 mock_prepare_vdu_cloud_init
,
4662 mock_prepare_vdu_interfaces
,
4663 mock_locate_vdu_interfaces
,
4664 mock_sort_vdu_interfaces
,
4666 """Instantiation volume list is empty."""
4667 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4669 target_vdu
["interfaces"] = interfaces_wth_all_positions
4671 vdu_instantiation_vol_list
= [
4673 "vim-volume-id": vim_volume_id
,
4674 "name": "persistent-volume2",
4677 target_vdu
["additionalParams"] = {
4678 "OSM": {"vdu_volumes": vdu_instantiation_vol_list
}
4680 mock_prepare_vdu_cloud_init
.return_value
= {}
4681 mock_prepare_vdu_affinity_group_list
.return_value
= []
4682 persistent_root_disk
= {
4683 "persistent-root-volume": {
4684 "image_id": "ubuntu20.04",
4688 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
4690 new_kwargs
= deepcopy(kwargs
)
4695 "tasks_by_target_record_id": {},
4699 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
4700 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4701 db
.get_one
.return_value
= vnfd
4702 result
= Ns
._process
_vdu
_params
(
4703 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4705 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4706 mock_locate_vdu_interfaces
.assert_not_called()
4707 mock_prepare_vdu_cloud_init
.assert_called_once()
4708 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
4709 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
4710 mock_prepare_vdu_interfaces
.assert_called_once_with(
4712 expected_extra_dict_copy
,
4719 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4720 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4721 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4722 mock_find_persistent_volumes
.assert_called_once_with(
4723 persistent_root_disk
, target_vdu
, vdu_instantiation_vol_list
, []
4726 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4727 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4728 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4729 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4730 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4731 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4732 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4733 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4734 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4735 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4736 def test_process_vdu_params_wth_affinity_groups(
4738 mock_prepare_vdu_affinity_group_list
,
4739 mock_add_persistent_ordinary_disks_to_disk_list
,
4740 mock_add_persistent_root_disk_to_disk_list
,
4741 mock_find_persistent_volumes
,
4742 mock_find_persistent_root_volumes
,
4743 mock_prepare_vdu_ssh_keys
,
4744 mock_prepare_vdu_cloud_init
,
4745 mock_prepare_vdu_interfaces
,
4746 mock_locate_vdu_interfaces
,
4747 mock_sort_vdu_interfaces
,
4749 """There is cloud-config."""
4750 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4753 target_vdu
["interfaces"] = interfaces_wth_all_positions
4754 mock_prepare_vdu_cloud_init
.return_value
= {}
4755 mock_prepare_vdu_affinity_group_list
.return_value
= [
4760 new_kwargs
= deepcopy(kwargs
)
4765 "tasks_by_target_record_id": {},
4769 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
4770 expected_extra_dict3
["params"]["affinity_group_list"] = [
4774 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4775 db
.get_one
.return_value
= vnfd
4776 result
= Ns
._process
_vdu
_params
(
4777 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4779 self
.assertDictEqual(result
, expected_extra_dict3
)
4780 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4781 mock_locate_vdu_interfaces
.assert_not_called()
4782 mock_prepare_vdu_cloud_init
.assert_called_once()
4783 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4784 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4785 mock_prepare_vdu_interfaces
.assert_called_once_with(
4787 expected_extra_dict3
,
4795 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4796 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4797 mock_find_persistent_volumes
.assert_not_called()
4799 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4800 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4801 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4802 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4803 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4804 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4805 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4806 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4807 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4808 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4809 def test_process_vdu_params_wth_cloud_config(
4811 mock_prepare_vdu_affinity_group_list
,
4812 mock_add_persistent_ordinary_disks_to_disk_list
,
4813 mock_add_persistent_root_disk_to_disk_list
,
4814 mock_find_persistent_volumes
,
4815 mock_find_persistent_root_volumes
,
4816 mock_prepare_vdu_ssh_keys
,
4817 mock_prepare_vdu_cloud_init
,
4818 mock_prepare_vdu_interfaces
,
4819 mock_locate_vdu_interfaces
,
4820 mock_sort_vdu_interfaces
,
4822 """There is cloud-config."""
4823 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4826 target_vdu
["interfaces"] = interfaces_wth_all_positions
4827 mock_prepare_vdu_cloud_init
.return_value
= {
4828 "user-data": user_data
,
4829 "boot-data-drive": "vda",
4831 mock_prepare_vdu_affinity_group_list
.return_value
= []
4833 new_kwargs
= deepcopy(kwargs
)
4838 "tasks_by_target_record_id": {},
4842 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
4843 expected_extra_dict3
["params"]["cloud_config"] = {
4844 "user-data": user_data
,
4845 "boot-data-drive": "vda",
4847 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4848 db
.get_one
.return_value
= vnfd
4849 result
= Ns
._process
_vdu
_params
(
4850 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4852 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4853 mock_locate_vdu_interfaces
.assert_not_called()
4854 mock_prepare_vdu_cloud_init
.assert_called_once()
4855 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4856 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4857 mock_prepare_vdu_interfaces
.assert_called_once_with(
4859 expected_extra_dict3
,
4866 self
.assertDictEqual(result
, expected_extra_dict3
)
4867 mock_prepare_vdu_ssh_keys
.assert_called_once_with(
4868 target_vdu
, None, {"user-data": user_data
, "boot-data-drive": "vda"}
4870 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4871 mock_find_persistent_volumes
.assert_not_called()
4873 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4874 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4875 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4876 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4877 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4878 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4879 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4880 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4881 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4882 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4883 def test_process_vdu_params_wthout_persistent_storage(
4885 mock_prepare_vdu_affinity_group_list
,
4886 mock_add_persistent_ordinary_disks_to_disk_list
,
4887 mock_add_persistent_root_disk_to_disk_list
,
4888 mock_find_persistent_volumes
,
4889 mock_find_persistent_root_volumes
,
4890 mock_prepare_vdu_ssh_keys
,
4891 mock_prepare_vdu_cloud_init
,
4892 mock_prepare_vdu_interfaces
,
4893 mock_locate_vdu_interfaces
,
4894 mock_sort_vdu_interfaces
,
4896 """There is not any persistent storage."""
4897 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4900 target_vdu
["interfaces"] = interfaces_wth_all_positions
4901 mock_prepare_vdu_cloud_init
.return_value
= {}
4902 mock_prepare_vdu_affinity_group_list
.return_value
= []
4904 new_kwargs
= deepcopy(kwargs
)
4909 "tasks_by_target_record_id": {},
4913 expected_extra_dict_copy
= deepcopy(expected_extra_dict2
)
4914 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4915 db
.get_one
.return_value
= vnfd
4916 result
= Ns
._process
_vdu
_params
(
4917 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4919 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4920 mock_locate_vdu_interfaces
.assert_not_called()
4921 mock_prepare_vdu_cloud_init
.assert_called_once()
4922 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4923 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4924 mock_prepare_vdu_interfaces
.assert_called_once_with(
4926 expected_extra_dict_copy
,
4933 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4934 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4935 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4936 mock_find_persistent_volumes
.assert_not_called()
4938 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4939 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4940 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4941 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4942 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4943 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4944 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4945 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4946 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4947 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4948 def test_process_vdu_params_interfaces_partially_located(
4950 mock_prepare_vdu_affinity_group_list
,
4951 mock_add_persistent_ordinary_disks_to_disk_list
,
4952 mock_add_persistent_root_disk_to_disk_list
,
4953 mock_find_persistent_volumes
,
4954 mock_find_persistent_root_volumes
,
4955 mock_prepare_vdu_ssh_keys
,
4956 mock_prepare_vdu_cloud_init
,
4957 mock_prepare_vdu_interfaces
,
4958 mock_locate_vdu_interfaces
,
4959 mock_sort_vdu_interfaces
,
4961 """Some interfaces have position."""
4962 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4965 target_vdu
["interfaces"] = [
4968 "ns-vld-id": "net1",
4970 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
4973 "ns-vld-id": "mgmtnet",
4976 mock_prepare_vdu_cloud_init
.return_value
= {}
4977 mock_prepare_vdu_affinity_group_list
.return_value
= []
4978 persistent_root_disk
= {
4979 "persistent-root-volume": {
4980 "image_id": "ubuntu20.04",
4984 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
4986 new_kwargs
= deepcopy(kwargs
)
4991 "tasks_by_target_record_id": {},
4996 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4997 db
.get_one
.return_value
= vnfd
4998 result
= Ns
._process
_vdu
_params
(
4999 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5001 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5002 mock_sort_vdu_interfaces
.assert_not_called()
5003 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5004 mock_prepare_vdu_cloud_init
.assert_called_once()
5005 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5006 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5007 mock_prepare_vdu_interfaces
.assert_called_once_with(
5009 expected_extra_dict_copy
,
5016 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5017 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5018 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5019 mock_find_persistent_volumes
.assert_not_called()
5020 mock_find_persistent_root_volumes
.assert_not_called()
5022 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5023 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5024 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5025 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5026 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5027 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5028 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5029 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5030 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5031 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5032 def test_process_vdu_params_no_interface_position(
5034 mock_prepare_vdu_affinity_group_list
,
5035 mock_add_persistent_ordinary_disks_to_disk_list
,
5036 mock_add_persistent_root_disk_to_disk_list
,
5037 mock_find_persistent_volumes
,
5038 mock_find_persistent_root_volumes
,
5039 mock_prepare_vdu_ssh_keys
,
5040 mock_prepare_vdu_cloud_init
,
5041 mock_prepare_vdu_interfaces
,
5042 mock_locate_vdu_interfaces
,
5043 mock_sort_vdu_interfaces
,
5045 """Interfaces do not have position."""
5046 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5049 target_vdu
["interfaces"] = interfaces_wthout_positions
5050 mock_prepare_vdu_cloud_init
.return_value
= {}
5051 mock_prepare_vdu_affinity_group_list
.return_value
= []
5052 persistent_root_disk
= {
5053 "persistent-root-volume": {
5054 "image_id": "ubuntu20.04",
5058 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5059 new_kwargs
= deepcopy(kwargs
)
5064 "tasks_by_target_record_id": {},
5069 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5070 db
.get_one
.return_value
= vnfd
5071 result
= Ns
._process
_vdu
_params
(
5072 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5074 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5075 mock_sort_vdu_interfaces
.assert_not_called()
5076 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5077 mock_prepare_vdu_cloud_init
.assert_called_once()
5078 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5079 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5080 mock_prepare_vdu_interfaces
.assert_called_once_with(
5082 expected_extra_dict_copy
,
5089 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5090 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5091 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5092 mock_find_persistent_volumes
.assert_not_called()
5093 mock_find_persistent_root_volumes
.assert_not_called()
5095 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5096 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5097 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5098 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5099 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5100 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5101 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5102 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5103 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5104 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5105 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5107 mock_prepare_vdu_affinity_group_list
,
5108 mock_add_persistent_ordinary_disks_to_disk_list
,
5109 mock_add_persistent_root_disk_to_disk_list
,
5110 mock_find_persistent_volumes
,
5111 mock_find_persistent_root_volumes
,
5112 mock_prepare_vdu_ssh_keys
,
5113 mock_prepare_vdu_cloud_init
,
5114 mock_prepare_vdu_interfaces
,
5115 mock_locate_vdu_interfaces
,
5116 mock_sort_vdu_interfaces
,
5118 """Prepare vdu interfaces method raises exception."""
5119 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5122 target_vdu
["interfaces"] = interfaces_wthout_positions
5123 mock_prepare_vdu_cloud_init
.return_value
= {}
5124 mock_prepare_vdu_affinity_group_list
.return_value
= []
5125 persistent_root_disk
= {
5126 "persistent-root-volume": {
5127 "image_id": "ubuntu20.04",
5131 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5132 new_kwargs
= deepcopy(kwargs
)
5137 "tasks_by_target_record_id": {},
5141 mock_prepare_vdu_interfaces
.side_effect
= TypeError
5143 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5144 db
.get_one
.return_value
= vnfd
5145 with self
.assertRaises(Exception) as err
:
5146 Ns
._process
_vdu
_params
(
5147 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5149 self
.assertEqual(type(err
), TypeError)
5150 mock_sort_vdu_interfaces
.assert_not_called()
5151 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5152 mock_prepare_vdu_cloud_init
.assert_not_called()
5153 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
5154 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5155 mock_prepare_vdu_interfaces
.assert_called_once()
5156 mock_prepare_vdu_ssh_keys
.assert_not_called()
5157 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5158 mock_find_persistent_volumes
.assert_not_called()
5159 mock_find_persistent_root_volumes
.assert_not_called()
5161 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5162 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5163 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5164 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5165 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5166 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5167 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5168 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5169 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5170 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5171 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5173 mock_prepare_vdu_affinity_group_list
,
5174 mock_add_persistent_ordinary_disks_to_disk_list
,
5175 mock_add_persistent_root_disk_to_disk_list
,
5176 mock_find_persistent_volumes
,
5177 mock_find_persistent_root_volumes
,
5178 mock_prepare_vdu_ssh_keys
,
5179 mock_prepare_vdu_cloud_init
,
5180 mock_prepare_vdu_interfaces
,
5181 mock_locate_vdu_interfaces
,
5182 mock_sort_vdu_interfaces
,
5184 """Add persistent root disk method raises exception."""
5185 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5188 target_vdu
["interfaces"] = interfaces_wthout_positions
5189 mock_prepare_vdu_cloud_init
.return_value
= {}
5190 mock_prepare_vdu_affinity_group_list
.return_value
= []
5191 mock_add_persistent_root_disk_to_disk_list
.side_effect
= KeyError
5192 new_kwargs
= deepcopy(kwargs
)
5197 "tasks_by_target_record_id": {},
5202 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5203 db
.get_one
.return_value
= vnfd
5204 with self
.assertRaises(Exception) as err
:
5205 Ns
._process
_vdu
_params
(
5206 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5208 self
.assertEqual(type(err
), KeyError)
5209 mock_sort_vdu_interfaces
.assert_not_called()
5210 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5211 mock_prepare_vdu_cloud_init
.assert_called_once()
5212 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5213 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5214 mock_prepare_vdu_interfaces
.assert_called_once_with(
5218 f
"{ns_preffix}:image.0",
5219 f
"{ns_preffix}:flavor.0",
5229 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5230 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5231 mock_find_persistent_volumes
.assert_not_called()
5232 mock_find_persistent_root_volumes
.assert_not_called()
5234 def test_select_persistent_root_disk(self
):
5235 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5236 vdu
["virtual-storage-desc"] = [
5237 "persistent-root-volume",
5238 "persistent-volume2",
5241 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5242 expected_result
= vsd
5243 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5244 self
.assertEqual(result
, expected_result
)
5246 def test_select_persistent_root_disk_first_vsd_is_different(self
):
5247 """VDU first virtual-storage-desc is different than vsd id."""
5248 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5249 vdu
["virtual-storage-desc"] = [
5250 "persistent-volume2",
5251 "persistent-root-volume",
5254 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5255 expected_result
= None
5256 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5257 self
.assertEqual(result
, expected_result
)
5259 def test_select_persistent_root_disk_vsd_is_not_persistent(self
):
5260 """vsd type is not persistent."""
5261 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5262 vdu
["virtual-storage-desc"] = [
5263 "persistent-volume2",
5264 "persistent-root-volume",
5267 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5268 vsd
["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5269 expected_result
= None
5270 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5271 self
.assertEqual(result
, expected_result
)
5273 def test_select_persistent_root_disk_vsd_does_not_have_size(self
):
5274 """vsd size is None."""
5275 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5276 vdu
["virtual-storage-desc"] = [
5277 "persistent-volume2",
5278 "persistent-root-volume",
5281 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5282 vsd
["size-of-storage"] = None
5283 expected_result
= None
5284 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5285 self
.assertEqual(result
, expected_result
)
5287 def test_select_persistent_root_disk_vdu_wthout_vsd(self
):
5288 """VDU does not have virtual-storage-desc."""
5289 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5290 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5291 expected_result
= None
5292 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5293 self
.assertEqual(result
, expected_result
)
5295 def test_select_persistent_root_disk_invalid_vsd_type(self
):
5296 """vsd is list, expected to be a dict."""
5297 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5298 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"]
5299 with self
.assertRaises(AttributeError):
5300 Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)