1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy
import deepcopy
19 from unittest
.mock
import MagicMock
, Mock
, patch
29 from osm_ng_ro
.ns
import Ns
, NsException
32 __author__
= "Eduardo Sousa"
33 __date__
= "$19-NOV-2021 00:00:00$"
36 # Variables used in Tests
37 vnfd_wth_persistent_storage
= {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
56 "virtual-storage-desc": [
57 "persistent-root-volume",
64 "virtual-storage-desc": [
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
76 "id": "ephemeral-volume",
77 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
78 "size-of-storage": "1",
84 "path": "/app/storage/",
89 vim_volume_id
= "ru937f49-3870-4169-b758-9732e1ff40f3"
90 task_by_target_record_id
= {
91 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
92 "extra_dict": {"params": {"net_type": "SR-IOV"}}
95 interfaces_wthout_positions
= [
106 "ns-vld-id": "mgmtnet",
109 interfaces_wth_all_positions
= [
122 "ns-vld-id": "mgmtnet",
126 target_vdu_wth_persistent_storage
= {
127 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
130 "vdu-name": "several_volumes-VM",
134 "ns-vld-id": "mgmtnet",
137 "virtual-storages": [
139 "id": "persistent-volume2",
140 "size-of-storage": "10",
141 "type-of-storage": "persistent-storage:persistent-storage",
144 "id": "persistent-root-volume",
145 "size-of-storage": "10",
146 "type-of-storage": "persistent-storage:persistent-storage",
149 "id": "ephemeral-volume",
150 "size-of-storage": "1",
151 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
155 db
= MagicMock(name
="database mock")
156 fs
= MagicMock(name
="database mock")
157 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
158 vnf_preffix
= "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
159 vnfr_id
= "wh47f48-y870-4169-b758-5732e1ff40f5"
160 nsr_id
= "th47f48-9870-4169-b758-9732e1ff40f3"
162 "name": "sample_name",
164 expected_extra_dict
= {
166 f
"{ns_preffix}:image.0",
167 f
"{ns_preffix}:flavor.0",
170 "affinity_group_list": [],
171 "availability_zone_index": None,
172 "availability_zone_list": None,
173 "cloud_config": None,
174 "description": "several_volumes-VM",
176 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
177 "image_id": f
"TASK-{ns_preffix}:image.0",
178 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
184 expected_extra_dict2
= {
186 f
"{ns_preffix}:image.0",
187 f
"{ns_preffix}:flavor.0",
190 "affinity_group_list": [],
191 "availability_zone_index": None,
192 "availability_zone_list": None,
193 "cloud_config": None,
194 "description": "without_volumes-VM",
196 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
197 "image_id": f
"TASK-{ns_preffix}:image.0",
198 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
203 tasks_by_target_record_id
= {
204 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
207 "net_type": "SR-IOV",
214 "vdu2cloud_init": {},
216 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
217 "member-vnf-index-ref": "vnf-several-volumes",
220 vnfd_wthout_persistent_storage
= {
221 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
225 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
228 "id": "without_volumes-vnf",
229 "product-name": "without_volumes-vnf",
232 "id": "without_volumes-VM",
233 "name": "without_volumes-VM",
234 "sw-image-desc": "ubuntu20.04",
235 "alternative-sw-image-desc": [
239 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
243 "virtual-storage-desc": [
244 {"id": "root-volume", "size-of-storage": "10"},
246 "id": "ephemeral-volume",
247 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
248 "size-of-storage": "1",
254 "path": "/app/storage/",
260 target_vdu_wthout_persistent_storage
= {
261 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
264 "vdu-name": "without_volumes-VM",
268 "ns-vld-id": "mgmtnet",
271 "virtual-storages": [
274 "size-of-storage": "10",
277 "id": "ephemeral-volume",
278 "size-of-storage": "1",
279 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
283 cloud_init_content
= """
288 overwrite: {{is_override}}
291 - [ sh, -xc, "echo $(date) '{{command}}'" ]
302 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
306 class CopyingMock(MagicMock
):
307 def __call__(self
, *args
, **kwargs
):
308 args
= deepcopy(args
)
309 kwargs
= deepcopy(kwargs
)
310 return super(CopyingMock
, self
).__call
__(*args
, **kwargs
)
313 class TestNs(unittest
.TestCase
):
317 def test__create_task_without_extra_dict(self
):
319 "target_id": "vim_openstack_1",
320 "action_id": "123456",
322 "task_id": "123456:1",
323 "status": "SCHEDULED",
326 "target_record": "test_target_record",
327 "target_record_id": "test_target_record_id",
330 "action_id": "123456",
335 task
= Ns
._create
_task
(
336 deployment_info
=deployment_info
,
337 target_id
="vim_openstack_1",
340 target_record
="test_target_record",
341 target_record_id
="test_target_record_id",
344 self
.assertEqual(deployment_info
.get("task_index"), 2)
345 self
.assertDictEqual(task
, expected_result
)
347 def test__create_task(self
):
349 "target_id": "vim_openstack_1",
350 "action_id": "123456",
352 "task_id": "123456:1",
353 "status": "SCHEDULED",
356 "target_record": "test_target_record",
357 "target_record_id": "test_target_record_id",
358 # values coming from extra_dict
359 "params": "test_params",
360 "find_params": "test_find_params",
361 "depends_on": "test_depends_on",
364 "action_id": "123456",
369 task
= Ns
._create
_task
(
370 deployment_info
=deployment_info
,
371 target_id
="vim_openstack_1",
374 target_record
="test_target_record",
375 target_record_id
="test_target_record_id",
377 "params": "test_params",
378 "find_params": "test_find_params",
379 "depends_on": "test_depends_on",
383 self
.assertEqual(deployment_info
.get("task_index"), 2)
384 self
.assertDictEqual(task
, expected_result
)
386 @patch("osm_ng_ro.ns.time")
387 def test__create_ro_task(self
, mock_time
: Mock
):
388 now
= 1637324838.994551
389 mock_time
.return_value
= now
391 "target_id": "vim_openstack_1",
392 "action_id": "123456",
394 "task_id": "123456:1",
395 "status": "SCHEDULED",
398 "target_record": "test_target_record",
399 "target_record_id": "test_target_record_id",
400 # values coming from extra_dict
401 "params": "test_params",
402 "find_params": "test_find_params",
403 "depends_on": "test_depends_on",
409 "target_id": "vim_openstack_1",
412 "created_items": None,
426 ro_task
= Ns
._create
_ro
_task
(
427 target_id
="vim_openstack_1",
431 self
.assertDictEqual(ro_task
, expected_result
)
433 def test__process_image_params_with_empty_target_image(self
):
439 result
= Ns
._process
_image
_params
(
440 target_image
=target_image
,
443 target_record_id
=None,
446 self
.assertDictEqual(expected_result
, result
)
448 def test__process_image_params_with_wrong_target_image(self
):
453 "no_image": "to_see_here",
456 result
= Ns
._process
_image
_params
(
457 target_image
=target_image
,
460 target_record_id
=None,
463 self
.assertDictEqual(expected_result
, result
)
465 def test__process_image_params_with_image(self
):
477 result
= Ns
._process
_image
_params
(
478 target_image
=target_image
,
481 target_record_id
=None,
484 self
.assertDictEqual(expected_result
, result
)
486 def test__process_image_params_with_vim_image_id(self
):
495 "vim_image_id": "123456",
498 result
= Ns
._process
_image
_params
(
499 target_image
=target_image
,
502 target_record_id
=None,
505 self
.assertDictEqual(expected_result
, result
)
507 def test__process_image_params_with_image_checksum(self
):
511 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
516 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
519 result
= Ns
._process
_image
_params
(
520 target_image
=target_image
,
523 target_record_id
=None,
526 self
.assertDictEqual(expected_result
, result
)
528 def test__get_resource_allocation_params_with_empty_target_image(self
):
530 quota_descriptor
= {}
532 result
= Ns
._get
_resource
_allocation
_params
(
533 quota_descriptor
=quota_descriptor
,
536 self
.assertDictEqual(expected_result
, result
)
538 def test__get_resource_allocation_params_with_wrong_target_image(self
):
541 "no_quota": "present_here",
544 result
= Ns
._get
_resource
_allocation
_params
(
545 quota_descriptor
=quota_descriptor
,
548 self
.assertDictEqual(expected_result
, result
)
550 def test__get_resource_allocation_params_with_limit(self
):
558 result
= Ns
._get
_resource
_allocation
_params
(
559 quota_descriptor
=quota_descriptor
,
562 self
.assertDictEqual(expected_result
, result
)
564 def test__get_resource_allocation_params_with_reserve(self
):
572 result
= Ns
._get
_resource
_allocation
_params
(
573 quota_descriptor
=quota_descriptor
,
576 self
.assertDictEqual(expected_result
, result
)
578 def test__get_resource_allocation_params_with_shares(self
):
586 result
= Ns
._get
_resource
_allocation
_params
(
587 quota_descriptor
=quota_descriptor
,
590 self
.assertDictEqual(expected_result
, result
)
592 def test__get_resource_allocation_params(self
):
604 result
= Ns
._get
_resource
_allocation
_params
(
605 quota_descriptor
=quota_descriptor
,
608 self
.assertDictEqual(expected_result
, result
)
610 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
611 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
619 result
= Ns
._process
_guest
_epa
_quota
_params
(
620 guest_epa_quota
=guest_epa_quota
,
621 epa_vcpu_set
=epa_vcpu_set
,
624 self
.assertDictEqual(expected_result
, result
)
625 self
.assertFalse(resource_allocation
.called
)
627 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
628 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
636 result
= Ns
._process
_guest
_epa
_quota
_params
(
637 guest_epa_quota
=guest_epa_quota
,
638 epa_vcpu_set
=epa_vcpu_set
,
641 self
.assertDictEqual(expected_result
, result
)
642 self
.assertFalse(resource_allocation
.called
)
644 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
645 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
651 "no-quota": "nothing",
655 result
= Ns
._process
_guest
_epa
_quota
_params
(
656 guest_epa_quota
=guest_epa_quota
,
657 epa_vcpu_set
=epa_vcpu_set
,
660 self
.assertDictEqual(expected_result
, result
)
661 self
.assertFalse(resource_allocation
.called
)
663 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
664 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
670 "no-quota": "nothing",
674 result
= Ns
._process
_guest
_epa
_quota
_params
(
675 guest_epa_quota
=guest_epa_quota
,
676 epa_vcpu_set
=epa_vcpu_set
,
679 self
.assertDictEqual(expected_result
, result
)
680 self
.assertFalse(resource_allocation
.called
)
682 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
683 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
697 result
= Ns
._process
_guest
_epa
_quota
_params
(
698 guest_epa_quota
=guest_epa_quota
,
699 epa_vcpu_set
=epa_vcpu_set
,
702 self
.assertDictEqual(expected_result
, result
)
703 self
.assertFalse(resource_allocation
.called
)
705 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
706 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
726 resource_allocation_param
= {
731 resource_allocation
.return_value
= {
737 result
= Ns
._process
_guest
_epa
_quota
_params
(
738 guest_epa_quota
=guest_epa_quota
,
739 epa_vcpu_set
=epa_vcpu_set
,
742 resource_allocation
.assert_called_once_with(resource_allocation_param
)
743 self
.assertDictEqual(expected_result
, result
)
745 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
746 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
766 resource_allocation_param
= {
771 resource_allocation
.return_value
= {
777 result
= Ns
._process
_guest
_epa
_quota
_params
(
778 guest_epa_quota
=guest_epa_quota
,
779 epa_vcpu_set
=epa_vcpu_set
,
782 resource_allocation
.assert_called_once_with(resource_allocation_param
)
783 self
.assertDictEqual(expected_result
, result
)
785 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
786 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
806 resource_allocation_param
= {
811 resource_allocation
.return_value
= {
817 result
= Ns
._process
_guest
_epa
_quota
_params
(
818 guest_epa_quota
=guest_epa_quota
,
819 epa_vcpu_set
=epa_vcpu_set
,
822 resource_allocation
.assert_called_once_with(resource_allocation_param
)
823 self
.assertDictEqual(expected_result
, result
)
825 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
826 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
846 resource_allocation_param
= {
851 resource_allocation
.return_value
= {
857 result
= Ns
._process
_guest
_epa
_quota
_params
(
858 guest_epa_quota
=guest_epa_quota
,
859 epa_vcpu_set
=epa_vcpu_set
,
862 resource_allocation
.assert_called_once_with(resource_allocation_param
)
863 self
.assertDictEqual(expected_result
, result
)
865 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
866 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
886 resource_allocation_param
= {
891 resource_allocation
.return_value
= {
897 result
= Ns
._process
_guest
_epa
_quota
_params
(
898 guest_epa_quota
=guest_epa_quota
,
899 epa_vcpu_set
=epa_vcpu_set
,
902 resource_allocation
.assert_called_once_with(resource_allocation_param
)
903 self
.assertDictEqual(expected_result
, result
)
905 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
906 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
926 resource_allocation_param
= {
931 resource_allocation
.return_value
= {
937 result
= Ns
._process
_guest
_epa
_quota
_params
(
938 guest_epa_quota
=guest_epa_quota
,
939 epa_vcpu_set
=epa_vcpu_set
,
942 resource_allocation
.assert_called_once_with(resource_allocation_param
)
943 self
.assertDictEqual(expected_result
, result
)
945 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
946 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
966 resource_allocation_param
= {
971 resource_allocation
.return_value
= {
977 result
= Ns
._process
_guest
_epa
_quota
_params
(
978 guest_epa_quota
=guest_epa_quota
,
979 epa_vcpu_set
=epa_vcpu_set
,
982 resource_allocation
.assert_called_once_with(resource_allocation_param
)
983 self
.assertDictEqual(expected_result
, result
)
985 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
986 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
1031 resource_allocation
.return_value
= {
1037 result
= Ns
._process
_guest
_epa
_quota
_params
(
1038 guest_epa_quota
=guest_epa_quota
,
1039 epa_vcpu_set
=epa_vcpu_set
,
1042 self
.assertTrue(resource_allocation
.called
)
1043 self
.assertDictEqual(expected_result
, result
)
1045 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1046 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1048 resource_allocation
,
1094 epa_vcpu_set
= False
1096 resource_allocation
.return_value
= {
1102 result
= Ns
._process
_guest
_epa
_quota
_params
(
1103 guest_epa_quota
=guest_epa_quota
,
1104 epa_vcpu_set
=epa_vcpu_set
,
1107 self
.assertTrue(resource_allocation
.called
)
1108 self
.assertDictEqual(expected_result
, result
)
1110 def test__process_guest_epa_numa_params_with_empty_numa_params(self
):
1111 expected_numa_result
= []
1112 expected_epa_vcpu_set_result
= False
1113 guest_epa_quota
= {}
1115 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1116 guest_epa_quota
=guest_epa_quota
,
1118 self
.assertEqual(expected_numa_result
, numa_result
)
1119 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1121 def test__process_guest_epa_numa_params_with_wrong_numa_params(self
):
1122 expected_numa_result
= []
1123 expected_epa_vcpu_set_result
= False
1124 guest_epa_quota
= {"no_nume": "here"}
1126 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1127 guest_epa_quota
=guest_epa_quota
,
1130 self
.assertEqual(expected_numa_result
, numa_result
)
1131 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1133 def test__process_guest_epa_numa_params_with_numa_node_policy(self
):
1134 expected_numa_result
= []
1135 expected_epa_vcpu_set_result
= False
1136 guest_epa_quota
= {"numa-node-policy": {}}
1138 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1139 guest_epa_quota
=guest_epa_quota
,
1142 self
.assertEqual(expected_numa_result
, numa_result
)
1143 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1145 def test__process_guest_epa_numa_params_with_no_node(self
):
1146 expected_numa_result
= []
1147 expected_epa_vcpu_set_result
= False
1149 "numa-node-policy": {
1154 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1155 guest_epa_quota
=guest_epa_quota
,
1158 self
.assertEqual(expected_numa_result
, numa_result
)
1159 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1161 def test__process_guest_epa_numa_params_with_1_node_num_cores(self
):
1162 expected_numa_result
= [{"cores": 3}]
1163 expected_epa_vcpu_set_result
= True
1165 "numa-node-policy": {
1174 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1175 guest_epa_quota
=guest_epa_quota
,
1178 self
.assertEqual(expected_numa_result
, numa_result
)
1179 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1181 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self
):
1182 expected_numa_result
= [{"paired_threads": 3}]
1183 expected_epa_vcpu_set_result
= True
1185 "numa-node-policy": {
1188 "paired-threads": {"num-paired-threads": "3"},
1194 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1195 guest_epa_quota
=guest_epa_quota
,
1198 self
.assertEqual(expected_numa_result
, numa_result
)
1199 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1201 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self
):
1202 expected_numa_result
= [
1204 "paired-threads-id": [("0", "1"), ("4", "5")],
1207 expected_epa_vcpu_set_result
= False
1209 "numa-node-policy": {
1213 "paired-thread-ids": [
1229 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1230 guest_epa_quota
=guest_epa_quota
,
1233 self
.assertEqual(expected_numa_result
, numa_result
)
1234 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1236 def test__process_guest_epa_numa_params_with_1_node_num_threads(self
):
1237 expected_numa_result
= [{"threads": 3}]
1238 expected_epa_vcpu_set_result
= True
1240 "numa-node-policy": {
1249 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1250 guest_epa_quota
=guest_epa_quota
,
1253 self
.assertEqual(expected_numa_result
, numa_result
)
1254 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1256 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self
):
1257 expected_numa_result
= [{"memory": 2}]
1258 expected_epa_vcpu_set_result
= False
1260 "numa-node-policy": {
1269 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1270 guest_epa_quota
=guest_epa_quota
,
1273 self
.assertEqual(expected_numa_result
, numa_result
)
1274 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1276 def test__process_guest_epa_numa_params_with_1_node_vcpu(self
):
1277 expected_numa_result
= [
1283 expected_epa_vcpu_set_result
= False
1285 "numa-node-policy": {
1286 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1290 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1291 guest_epa_quota
=guest_epa_quota
,
1294 self
.assertEqual(expected_numa_result
, numa_result
)
1295 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1297 def test__process_guest_epa_numa_params_with_2_node_vcpu(self
):
1298 expected_numa_result
= [
1309 expected_epa_vcpu_set_result
= False
1311 "numa-node-policy": {
1313 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1314 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1319 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1320 guest_epa_quota
=guest_epa_quota
,
1323 self
.assertEqual(expected_numa_result
, numa_result
)
1324 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1326 def test__process_guest_epa_numa_params_with_1_node(self
):
1327 expected_numa_result
= [
1332 "paired_threads": 3,
1333 "paired-threads-id": [("0", "1"), ("4", "5")],
1338 expected_epa_vcpu_set_result
= True
1340 "numa-node-policy": {
1345 "num-paired-threads": "3",
1346 "paired-thread-ids": [
1364 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1365 guest_epa_quota
=guest_epa_quota
,
1368 self
.assertEqual(expected_numa_result
, numa_result
)
1369 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1371 def test__process_guest_epa_numa_params_with_2_nodes(self
):
1372 expected_numa_result
= [
1375 "paired_threads": 3,
1376 "paired-threads-id": [("0", "1"), ("4", "5")],
1382 "paired_threads": 7,
1383 "paired-threads-id": [("2", "3"), ("5", "6")],
1388 expected_epa_vcpu_set_result
= True
1390 "numa-node-policy": {
1395 "num-paired-threads": "3",
1396 "paired-thread-ids": [
1413 "num-paired-threads": "7",
1414 "paired-thread-ids": [
1432 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1433 guest_epa_quota
=guest_epa_quota
,
1436 self
.assertEqual(expected_numa_result
, numa_result
)
1437 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1439 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self
):
1440 expected_numa_result
= {}
1441 expected_epa_vcpu_set_result
= False
1442 guest_epa_quota
= {}
1444 epa_vcpu_set
= False
1446 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1447 guest_epa_quota
=guest_epa_quota
,
1448 vcpu_count
=vcpu_count
,
1449 epa_vcpu_set
=epa_vcpu_set
,
1452 self
.assertDictEqual(expected_numa_result
, numa_result
)
1453 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1455 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self
):
1456 expected_numa_result
= {}
1457 expected_epa_vcpu_set_result
= False
1459 "no-cpu-pinning-policy": "here",
1462 epa_vcpu_set
= False
1464 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1465 guest_epa_quota
=guest_epa_quota
,
1466 vcpu_count
=vcpu_count
,
1467 epa_vcpu_set
=epa_vcpu_set
,
1470 self
.assertDictEqual(expected_numa_result
, numa_result
)
1471 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1473 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self
):
1474 expected_numa_result
= {}
1475 expected_epa_vcpu_set_result
= True
1477 "cpu-pinning-policy": "DEDICATED",
1482 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1483 guest_epa_quota
=guest_epa_quota
,
1484 vcpu_count
=vcpu_count
,
1485 epa_vcpu_set
=epa_vcpu_set
,
1488 self
.assertDictEqual(expected_numa_result
, numa_result
)
1489 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1491 def test__process_guest_epa_cpu_pinning_params_with_threads(self
):
1492 expected_numa_result
= {"threads": 3}
1493 expected_epa_vcpu_set_result
= True
1495 "cpu-pinning-policy": "DEDICATED",
1496 "cpu-thread-pinning-policy": "PREFER",
1499 epa_vcpu_set
= False
1501 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1502 guest_epa_quota
=guest_epa_quota
,
1503 vcpu_count
=vcpu_count
,
1504 epa_vcpu_set
=epa_vcpu_set
,
1507 self
.assertDictEqual(expected_numa_result
, numa_result
)
1508 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1510 def test__process_guest_epa_cpu_pinning_params(self
):
1511 expected_numa_result
= {"cores": 3}
1512 expected_epa_vcpu_set_result
= True
1514 "cpu-pinning-policy": "DEDICATED",
1517 epa_vcpu_set
= False
1519 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1520 guest_epa_quota
=guest_epa_quota
,
1521 vcpu_count
=vcpu_count
,
1522 epa_vcpu_set
=epa_vcpu_set
,
1525 self
.assertDictEqual(expected_numa_result
, numa_result
)
1526 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1528 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1529 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1530 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1531 def test__process_guest_epa_params_with_empty_params(
1533 guest_epa_numa_params
,
1534 guest_epa_cpu_pinning_params
,
1535 guest_epa_quota_params
,
1537 expected_result
= {}
1540 result
= Ns
._process
_epa
_params
(
1541 target_flavor
=target_flavor
,
1544 self
.assertDictEqual(expected_result
, result
)
1545 self
.assertFalse(guest_epa_numa_params
.called
)
1546 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1547 self
.assertFalse(guest_epa_quota_params
.called
)
1549 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1550 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1551 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1552 def test__process_guest_epa_params_with_wrong_params(
1554 guest_epa_numa_params
,
1555 guest_epa_cpu_pinning_params
,
1556 guest_epa_quota_params
,
1558 expected_result
= {}
1560 "no-guest-epa": "here",
1563 result
= Ns
._process
_epa
_params
(
1564 target_flavor
=target_flavor
,
1567 self
.assertDictEqual(expected_result
, result
)
1568 self
.assertFalse(guest_epa_numa_params
.called
)
1569 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1570 self
.assertFalse(guest_epa_quota_params
.called
)
1572 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1573 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1574 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1575 def test__process_guest_epa_params(
1577 guest_epa_numa_params
,
1578 guest_epa_cpu_pinning_params
,
1579 guest_epa_quota_params
,
1582 "mem-policy": "STRICT",
1587 "numa-node-policy": {
1588 "mem-policy": "STRICT",
1593 guest_epa_numa_params
.return_value
= ({}, False)
1594 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1595 guest_epa_quota_params
.return_value
= {}
1597 result
= Ns
._process
_epa
_params
(
1598 target_flavor
=target_flavor
,
1601 self
.assertDictEqual(expected_result
, result
)
1602 self
.assertTrue(guest_epa_numa_params
.called
)
1603 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1604 self
.assertTrue(guest_epa_quota_params
.called
)
1606 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1607 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1608 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1609 def test__process_guest_epa_params_with_mempage_size(
1611 guest_epa_numa_params
,
1612 guest_epa_cpu_pinning_params
,
1613 guest_epa_quota_params
,
1616 "mempage-size": "1G",
1617 "mem-policy": "STRICT",
1622 "mempage-size": "1G",
1623 "numa-node-policy": {
1624 "mem-policy": "STRICT",
1629 guest_epa_numa_params
.return_value
= ({}, False)
1630 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1631 guest_epa_quota_params
.return_value
= {}
1633 result
= Ns
._process
_epa
_params
(
1634 target_flavor
=target_flavor
,
1637 self
.assertDictEqual(expected_result
, result
)
1638 self
.assertTrue(guest_epa_numa_params
.called
)
1639 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1640 self
.assertTrue(guest_epa_quota_params
.called
)
1642 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1643 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1644 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1645 def test__process_guest_epa_params_with_numa(
1647 guest_epa_numa_params
,
1648 guest_epa_cpu_pinning_params
,
1649 guest_epa_quota_params
,
1652 "mempage-size": "1G",
1653 "cpu-pinning-policy": "DEDICATED",
1654 "cpu-thread-pinning-policy": "PREFER",
1659 "paired-threads": 3,
1660 "paired-threads-id": [("0", "1"), ("4", "5")],
1664 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1665 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1666 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1667 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1672 "mempage-size": "1G",
1673 "cpu-pinning-policy": "DEDICATED",
1674 "cpu-thread-pinning-policy": "PREFER",
1675 "numa-node-policy": {
1680 "num-paired-threads": "3",
1681 "paired-thread-ids": [
1720 guest_epa_numa_params
.return_value
= (
1724 "paired-threads": 3,
1725 "paired-threads-id": [("0", "1"), ("4", "5")],
1732 guest_epa_cpu_pinning_params
.return_value
= (
1738 guest_epa_quota_params
.return_value
= {
1761 result
= Ns
._process
_epa
_params
(
1762 target_flavor
=target_flavor
,
1764 self
.assertEqual(expected_result
, result
)
1765 self
.assertTrue(guest_epa_numa_params
.called
)
1766 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1767 self
.assertTrue(guest_epa_quota_params
.called
)
1769 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1770 def test__process_flavor_params_with_empty_target_flavor(
1778 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1783 target_record_id
= ""
1785 with self
.assertRaises(KeyError):
1786 Ns
._process
_flavor
_params
(
1787 target_flavor
=target_flavor
,
1790 target_record_id
=target_record_id
,
1793 self
.assertFalse(epa_params
.called
)
1795 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1796 def test__process_flavor_params_with_wrong_target_flavor(
1801 "no-target-flavor": "here",
1805 target_record_id
= ""
1807 with self
.assertRaises(KeyError):
1808 Ns
._process
_flavor
_params
(
1809 target_flavor
=target_flavor
,
1812 target_record_id
=target_record_id
,
1815 self
.assertFalse(epa_params
.called
)
1817 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1818 def test__process_flavor_params_with_empty_indata(
1842 "memory-mb": "1024",
1847 target_record_id
= ""
1849 epa_params
.return_value
= {}
1851 result
= Ns
._process
_flavor
_params
(
1852 target_flavor
=target_flavor
,
1855 target_record_id
=target_record_id
,
1858 self
.assertTrue(epa_params
.called
)
1859 self
.assertDictEqual(result
, expected_result
)
1861 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1862 def test__process_flavor_params_with_wrong_indata(
1886 "memory-mb": "1024",
1893 target_record_id
= ""
1895 epa_params
.return_value
= {}
1897 result
= Ns
._process
_flavor
_params
(
1898 target_flavor
=target_flavor
,
1901 target_record_id
=target_record_id
,
1904 self
.assertTrue(epa_params
.called
)
1905 self
.assertDictEqual(result
, expected_result
)
1907 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1908 def test__process_flavor_params_with_ephemeral_disk(
1916 db
.get_one
.return_value
= {
1917 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1922 {"id": "without_volumes-VM", "min-number-of-instances": 1}
1926 "id": "without_volumes-vnf",
1927 "product-name": "without_volumes-vnf",
1930 "id": "without_volumes-VM",
1931 "name": "without_volumes-VM",
1932 "sw-image-desc": "ubuntu20.04",
1933 "alternative-sw-image-desc": [
1935 "ubuntu20.04-azure",
1937 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
1941 "virtual-storage-desc": [
1942 {"id": "root-volume", "size-of-storage": "10"},
1944 "id": "ephemeral-volume",
1945 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
1946 "size-of-storage": "1",
1952 "path": "/app/storage/",
1980 "memory-mb": "1024",
1988 "ns-flavor-id": "test_id",
1989 "virtual-storages": [
1991 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
1992 "size-of-storage": "10",
1997 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2002 target_record_id
= ""
2004 epa_params
.return_value
= {}
2006 result
= Ns
._process
_flavor
_params
(
2007 target_flavor
=target_flavor
,
2010 target_record_id
=target_record_id
,
2014 self
.assertTrue(epa_params
.called
)
2015 self
.assertDictEqual(result
, expected_result
)
2017 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2018 def test__process_flavor_params_with_swap_disk(
2045 "memory-mb": "1024",
2053 "ns-flavor-id": "test_id",
2054 "virtual-storages": [
2056 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2057 "size-of-storage": "20",
2062 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2067 target_record_id
= ""
2069 epa_params
.return_value
= {}
2071 result
= Ns
._process
_flavor
_params
(
2072 target_flavor
=target_flavor
,
2075 target_record_id
=target_record_id
,
2078 self
.assertTrue(epa_params
.called
)
2079 self
.assertDictEqual(result
, expected_result
)
2081 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2082 def test__process_flavor_params_with_persistent_root_disk(
2090 db
.get_one
.return_value
= {
2091 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2096 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2100 "id": "several_volumes-vnf",
2101 "product-name": "several_volumes-vnf",
2104 "id": "several_volumes-VM",
2105 "name": "several_volumes-VM",
2106 "sw-image-desc": "ubuntu20.04",
2107 "alternative-sw-image-desc": [
2109 "ubuntu20.04-azure",
2111 "virtual-storage-desc": [
2112 "persistent-root-volume",
2117 "virtual-storage-desc": [
2119 "id": "persistent-root-volume",
2120 "type-of-storage": "persistent-storage:persistent-storage",
2121 "size-of-storage": "10",
2127 "path": "/app/storage/",
2153 "memory-mb": "1024",
2161 "vdu-name": "several_volumes-VM",
2162 "ns-flavor-id": "test_id",
2163 "virtual-storages": [
2165 "type-of-storage": "persistent-storage:persistent-storage",
2166 "size-of-storage": "10",
2171 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2176 target_record_id
= ""
2178 epa_params
.return_value
= {}
2180 result
= Ns
._process
_flavor
_params
(
2181 target_flavor
=target_flavor
,
2184 target_record_id
=target_record_id
,
2188 self
.assertTrue(epa_params
.called
)
2189 self
.assertDictEqual(result
, expected_result
)
2191 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2192 def test__process_flavor_params_with_epa_params(
2203 "numa": "there-is-numa-here",
2214 "numa": "there-is-numa-here",
2223 "memory-mb": "1024",
2231 "ns-flavor-id": "test_id",
2234 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2239 target_record_id
= ""
2241 epa_params
.return_value
= {
2242 "numa": "there-is-numa-here",
2245 result
= Ns
._process
_flavor
_params
(
2246 target_flavor
=target_flavor
,
2249 target_record_id
=target_record_id
,
2252 self
.assertTrue(epa_params
.called
)
2253 self
.assertDictEqual(result
, expected_result
)
2255 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2256 def test__process_flavor_params(
2264 db
.get_one
.return_value
= {
2265 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2270 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2274 "id": "without_volumes-vnf",
2275 "product-name": "without_volumes-vnf",
2278 "id": "without_volumes-VM",
2279 "name": "without_volumes-VM",
2280 "sw-image-desc": "ubuntu20.04",
2281 "alternative-sw-image-desc": [
2283 "ubuntu20.04-azure",
2285 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2289 "virtual-storage-desc": [
2290 {"id": "root-volume", "size-of-storage": "10"},
2292 "id": "ephemeral-volume",
2293 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2294 "size-of-storage": "1",
2300 "path": "/app/storage/",
2315 "numa": "there-is-numa-here",
2328 "numa": "there-is-numa-here",
2337 "memory-mb": "1024",
2345 "ns-flavor-id": "test_id",
2346 "virtual-storages": [
2348 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2349 "size-of-storage": "10",
2352 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2353 "size-of-storage": "20",
2358 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2363 target_record_id
= ""
2365 epa_params
.return_value
= {
2366 "numa": "there-is-numa-here",
2369 result
= Ns
._process
_flavor
_params
(
2370 target_flavor
=target_flavor
,
2373 target_record_id
=target_record_id
,
2377 self
.assertTrue(epa_params
.called
)
2378 self
.assertDictEqual(result
, expected_result
)
2380 def test__ip_profile_to_ro_with_none(self
):
2383 result
= Ns
._ip
_profile
_to
_ro
(
2384 ip_profile
=ip_profile
,
2387 self
.assertIsNone(result
)
2389 def test__ip_profile_to_ro_with_empty_profile(self
):
2392 result
= Ns
._ip
_profile
_to
_ro
(
2393 ip_profile
=ip_profile
,
2396 self
.assertIsNone(result
)
2398 def test__ip_profile_to_ro_with_wrong_profile(self
):
2400 "no-profile": "here",
2403 "ip_version": "IPv4",
2404 "subnet_address": None,
2405 "gateway_address": None,
2406 "dhcp_enabled": False,
2407 "dhcp_start_address": None,
2411 result
= Ns
._ip
_profile
_to
_ro
(
2412 ip_profile
=ip_profile
,
2415 self
.assertDictEqual(expected_result
, result
)
2417 def test__ip_profile_to_ro_with_ipv4_profile(self
):
2419 "ip-version": "ipv4",
2420 "subnet-address": "192.168.0.0/24",
2421 "gateway-address": "192.168.0.254",
2424 "start-address": "192.168.0.10",
2429 "ip_version": "IPv4",
2430 "subnet_address": "192.168.0.0/24",
2431 "gateway_address": "192.168.0.254",
2432 "dhcp_enabled": True,
2433 "dhcp_start_address": "192.168.0.10",
2437 result
= Ns
._ip
_profile
_to
_ro
(
2438 ip_profile
=ip_profile
,
2441 self
.assertDictEqual(expected_result
, result
)
2443 def test__ip_profile_to_ro_with_ipv6_profile(self
):
2445 "ip-version": "ipv6",
2446 "subnet-address": "2001:0200:0001::/48",
2447 "gateway-address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2450 "start-address": "2001:0200:0001::0010",
2455 "ip_version": "IPv6",
2456 "subnet_address": "2001:0200:0001::/48",
2457 "gateway_address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2458 "dhcp_enabled": True,
2459 "dhcp_start_address": "2001:0200:0001::0010",
2463 result
= Ns
._ip
_profile
_to
_ro
(
2464 ip_profile
=ip_profile
,
2467 self
.assertDictEqual(expected_result
, result
)
2469 def test__ip_profile_to_ro_with_dns_server(self
):
2471 "ip-version": "ipv4",
2472 "subnet-address": "192.168.0.0/24",
2473 "gateway-address": "192.168.0.254",
2476 "start-address": "192.168.0.10",
2481 "address": "8.8.8.8",
2484 "address": "1.1.1.1",
2487 "address": "1.0.0.1",
2492 "ip_version": "IPv4",
2493 "subnet_address": "192.168.0.0/24",
2494 "gateway_address": "192.168.0.254",
2495 "dhcp_enabled": True,
2496 "dhcp_start_address": "192.168.0.10",
2498 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2501 result
= Ns
._ip
_profile
_to
_ro
(
2502 ip_profile
=ip_profile
,
2505 self
.assertDictEqual(expected_result
, result
)
2507 def test__ip_profile_to_ro_with_security_group(self
):
2509 "ip-version": "ipv4",
2510 "subnet-address": "192.168.0.0/24",
2511 "gateway-address": "192.168.0.254",
2514 "start-address": "192.168.0.10",
2518 "some-security-group": "here",
2522 "ip_version": "IPv4",
2523 "subnet_address": "192.168.0.0/24",
2524 "gateway_address": "192.168.0.254",
2525 "dhcp_enabled": True,
2526 "dhcp_start_address": "192.168.0.10",
2529 "some-security-group": "here",
2533 result
= Ns
._ip
_profile
_to
_ro
(
2534 ip_profile
=ip_profile
,
2537 self
.assertDictEqual(expected_result
, result
)
2539 def test__ip_profile_to_ro(self
):
2541 "ip-version": "ipv4",
2542 "subnet-address": "192.168.0.0/24",
2543 "gateway-address": "192.168.0.254",
2546 "start-address": "192.168.0.10",
2551 "address": "8.8.8.8",
2554 "address": "1.1.1.1",
2557 "address": "1.0.0.1",
2561 "some-security-group": "here",
2565 "ip_version": "IPv4",
2566 "subnet_address": "192.168.0.0/24",
2567 "gateway_address": "192.168.0.254",
2568 "dhcp_enabled": True,
2569 "dhcp_start_address": "192.168.0.10",
2571 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2573 "some-security-group": "here",
2577 result
= Ns
._ip
_profile
_to
_ro
(
2578 ip_profile
=ip_profile
,
2581 self
.assertDictEqual(expected_result
, result
)
2583 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2584 def test__process_net_params_with_empty_params(
2595 "provider_network": "some-profile-here",
2597 target_record_id
= ""
2600 "net_name": "ns-name-vld-name",
2601 "net_type": "bridge",
2603 "some_ip_profile": "here",
2605 "provider_network_profile": "some-profile-here",
2609 ip_profile_to_ro
.return_value
= {
2610 "some_ip_profile": "here",
2613 result
= Ns
._process
_net
_params
(
2614 target_vld
=target_vld
,
2617 target_record_id
=target_record_id
,
2620 self
.assertDictEqual(expected_result
, result
)
2621 self
.assertTrue(ip_profile_to_ro
.called
)
2623 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2624 def test__process_net_params_with_vim_info_sdn(
2636 "sdn-ports": ["some", "ports", "here"],
2637 "vlds": ["some", "vlds", "here"],
2640 target_record_id
= "vld.sdn.something"
2643 "sdn-ports": ["some", "ports", "here"],
2644 "vlds": ["some", "vlds", "here"],
2649 result
= Ns
._process
_net
_params
(
2650 target_vld
=target_vld
,
2653 target_record_id
=target_record_id
,
2656 self
.assertDictEqual(expected_result
, result
)
2657 self
.assertFalse(ip_profile_to_ro
.called
)
2659 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2660 def test__process_net_params_with_vim_info_sdn_target_vim(
2672 "sdn-ports": ["some", "ports", "here"],
2673 "vlds": ["some", "vlds", "here"],
2674 "target_vim": "some-vim",
2677 target_record_id
= "vld.sdn.something"
2679 "depends_on": ["some-vim vld.sdn"],
2681 "sdn-ports": ["some", "ports", "here"],
2682 "vlds": ["some", "vlds", "here"],
2683 "target_vim": "some-vim",
2688 result
= Ns
._process
_net
_params
(
2689 target_vld
=target_vld
,
2692 target_record_id
=target_record_id
,
2695 self
.assertDictEqual(expected_result
, result
)
2696 self
.assertFalse(ip_profile_to_ro
.called
)
2698 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2699 def test__process_net_params_with_vim_network_name(
2710 "vim_network_name": "some-network-name",
2712 target_record_id
= "vld.sdn.something"
2716 "name": "some-network-name",
2721 result
= Ns
._process
_net
_params
(
2722 target_vld
=target_vld
,
2725 target_record_id
=target_record_id
,
2728 self
.assertDictEqual(expected_result
, result
)
2729 self
.assertFalse(ip_profile_to_ro
.called
)
2731 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2732 def test__process_net_params_with_vim_network_id(
2743 "vim_network_id": "some-network-id",
2745 target_record_id
= "vld.sdn.something"
2749 "id": "some-network-id",
2754 result
= Ns
._process
_net
_params
(
2755 target_vld
=target_vld
,
2758 target_record_id
=target_record_id
,
2761 self
.assertDictEqual(expected_result
, result
)
2762 self
.assertFalse(ip_profile_to_ro
.called
)
2764 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2765 def test__process_net_params_with_mgmt_network(
2772 "mgmt-network": "some-mgmt-network",
2778 target_record_id
= "vld.sdn.something"
2786 result
= Ns
._process
_net
_params
(
2787 target_vld
=target_vld
,
2790 target_record_id
=target_record_id
,
2793 self
.assertDictEqual(expected_result
, result
)
2794 self
.assertFalse(ip_profile_to_ro
.called
)
2796 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2797 def test__process_net_params_with_underlay_eline(
2803 "underlay": "some-underlay-here",
2810 "provider_network": "some-profile-here",
2812 target_record_id
= ""
2816 "some_ip_profile": "here",
2818 "net_name": "ns-name-vld-name",
2820 "provider_network_profile": "some-profile-here",
2824 ip_profile_to_ro
.return_value
= {
2825 "some_ip_profile": "here",
2828 result
= Ns
._process
_net
_params
(
2829 target_vld
=target_vld
,
2832 target_record_id
=target_record_id
,
2835 self
.assertDictEqual(expected_result
, result
)
2836 self
.assertTrue(ip_profile_to_ro
.called
)
2838 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2839 def test__process_net_params_with_underlay_elan(
2845 "underlay": "some-underlay-here",
2852 "provider_network": "some-profile-here",
2854 target_record_id
= ""
2858 "some_ip_profile": "here",
2860 "net_name": "ns-name-vld-name",
2862 "provider_network_profile": "some-profile-here",
2866 ip_profile_to_ro
.return_value
= {
2867 "some_ip_profile": "here",
2870 result
= Ns
._process
_net
_params
(
2871 target_vld
=target_vld
,
2874 target_record_id
=target_record_id
,
2877 self
.assertDictEqual(expected_result
, result
)
2878 self
.assertTrue(ip_profile_to_ro
.called
)
2880 def test__get_cloud_init_exception(self
):
2881 db_mock
= MagicMock(name
="database mock")
2886 with self
.assertRaises(NsException
):
2887 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2889 def test__get_cloud_init_file_fs_exception(self
):
2890 db_mock
= MagicMock(name
="database mock")
2893 location
= "vnfr_id_123456:file:test_file"
2894 db_mock
.get_one
.return_value
= {
2897 "folder": "/home/osm",
2898 "pkg-dir": "vnfr_test_dir",
2903 with self
.assertRaises(NsException
):
2904 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2906 def test__get_cloud_init_file(self
):
2907 db_mock
= MagicMock(name
="database mock")
2908 fs_mock
= MagicMock(name
="filesystem mock")
2909 file_mock
= MagicMock(name
="file mock")
2911 location
= "vnfr_id_123456:file:test_file"
2912 cloud_init_content
= "this is a cloud init file content"
2914 db_mock
.get_one
.return_value
= {
2917 "folder": "/home/osm",
2918 "pkg-dir": "vnfr_test_dir",
2922 fs_mock
.file_open
.return_value
= file_mock
2923 file_mock
.__enter
__.return_value
.read
.return_value
= cloud_init_content
2925 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2927 self
.assertEqual(cloud_init_content
, result
)
2929 def test__get_cloud_init_vdu(self
):
2930 db_mock
= MagicMock(name
="database mock")
2933 location
= "vnfr_id_123456:vdu:0"
2934 cloud_init_content
= "this is a cloud init file content"
2936 db_mock
.get_one
.return_value
= {
2939 "cloud-init": cloud_init_content
,
2944 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2946 self
.assertEqual(cloud_init_content
, result
)
2948 @patch("jinja2.Environment.__init__")
2949 def test__parse_jinja2_undefined_error(self
, env_mock
: Mock
):
2950 cloud_init_content
= None
2954 env_mock
.side_effect
= UndefinedError("UndefinedError occurred.")
2956 with self
.assertRaises(NsException
):
2958 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2961 @patch("jinja2.Environment.__init__")
2962 def test__parse_jinja2_template_error(self
, env_mock
: Mock
):
2963 cloud_init_content
= None
2967 env_mock
.side_effect
= TemplateError("TemplateError occurred.")
2969 with self
.assertRaises(NsException
):
2971 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2974 @patch("jinja2.Environment.__init__")
2975 def test__parse_jinja2_template_not_found(self
, env_mock
: Mock
):
2976 cloud_init_content
= None
2980 env_mock
.side_effect
= TemplateNotFound("TemplateNotFound occurred.")
2982 with self
.assertRaises(NsException
):
2984 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2987 def test_rendering_jinja2_temp_without_special_characters(self
):
2988 cloud_init_content
= """
2991 table_type: {{type}}
2993 overwrite: {{is_override}}
2996 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3000 "is_override": "False",
3001 "command": "; mkdir abc",
3003 context
= "cloud-init for VM"
3004 expected_result
= """
3012 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
3014 result
= Ns
._parse
_jinja
2(
3015 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3017 self
.assertEqual(result
, expected_result
)
3019 def test_rendering_jinja2_temp_with_special_characters(self
):
3020 cloud_init_content
= """
3023 table_type: {{type}}
3025 overwrite: {{is_override}}
3028 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3032 "is_override": "False",
3033 "command": "& rm -rf",
3035 context
= "cloud-init for VM"
3036 expected_result
= """
3044 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3046 result
= Ns
._parse
_jinja
2(
3047 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3049 self
.assertNotEqual(result
, expected_result
)
3051 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self
):
3052 with
patch("osm_ng_ro.ns.Environment") as mock_environment
:
3053 mock_environment
.return_value
= Environment(
3054 undefined
=StrictUndefined
,
3055 autoescape
=select_autoescape(default_for_string
=False, default
=False),
3057 cloud_init_content
= """
3060 table_type: {{type}}
3062 overwrite: {{is_override}}
3065 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3069 "is_override": "False",
3070 "command": "& rm -rf /",
3072 context
= "cloud-init for VM"
3073 expected_result
= """
3081 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3083 result
= Ns
._parse
_jinja
2(
3084 cloud_init_content
=cloud_init_content
,
3088 self
.assertEqual(result
, expected_result
)
3090 @patch("osm_ng_ro.ns.Ns._assign_vim")
3091 def test__rebuild_start_stop_task(self
, assign_vim
):
3094 actions
= ["start", "stop", "rebuild"]
3095 vdu_id
= "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3096 vnf_id
= "665b4165-ce24-4320-bf19-b9a45bade49f"
3098 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3099 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3101 target_vim
= "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3102 t
= "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3103 for action
in actions
:
3105 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3106 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3107 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3108 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
3109 "status": "SCHEDULED",
3112 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0",
3113 "target_record_id": t
,
3115 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3119 extra_dict
["params"] = {
3120 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3123 task
= self
.ns
.rebuild_start_stop_task(
3133 self
.assertEqual(task
.get("action_id"), action_id
)
3134 self
.assertEqual(task
.get("nsr_id"), nsr_id
)
3135 self
.assertEqual(task
.get("target_id"), target_vim
)
3136 self
.assertDictEqual(task
, expected_result
)
3138 @patch("osm_ng_ro.ns.Ns._assign_vim")
3139 def test_verticalscale_task(self
, assign_vim
):
3143 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3144 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3146 target_record_id
= (
3147 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3148 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3152 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3153 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3154 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3155 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3156 "status": "SCHEDULED",
3158 "item": "verticalscale",
3159 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3160 "target_record_id": target_record_id
,
3162 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3163 "flavor_dict": "flavor_dict",
3167 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3169 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3172 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3173 extra_dict
["params"] = {
3174 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3175 "flavor_dict": "flavor_dict",
3177 task
= self
.ns
.verticalscale_task(
3178 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3181 self
.assertDictEqual(task
, expected_result
)
3183 @patch("osm_ng_ro.ns.Ns._assign_vim")
3184 def test_migrate_task(self
, assign_vim
):
3188 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3189 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3191 target_record_id
= (
3192 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3193 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3197 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3198 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3199 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3200 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3201 "status": "SCHEDULED",
3204 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3205 "target_record_id": target_record_id
,
3207 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3208 "migrate_host": "migrateToHost",
3212 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3214 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3217 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3218 extra_dict
["params"] = {
3219 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3220 "migrate_host": "migrateToHost",
3222 task
= self
.ns
.migrate_task(
3223 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3226 self
.assertDictEqual(task
, expected_result
)
3229 class TestProcessVduParams(unittest
.TestCase
):
3232 self
.logger
= CopyingMock(autospec
=True)
3234 def test_find_persistent_root_volumes_empty_instantiation_vol_list(self
):
3235 """Find persistent root volume, instantiation_vol_list is empty."""
3236 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3237 target_vdu
= target_vdu_wth_persistent_storage
3238 vdu_instantiation_volumes_list
= []
3240 expected_persist_root_disk
= {
3241 "persistent-root-volume": {
3242 "image_id": "ubuntu20.04",
3246 expected_disk_list
= [
3248 "image_id": "ubuntu20.04",
3252 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3253 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3255 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3256 self
.assertEqual(disk_list
, expected_disk_list
)
3257 self
.assertEqual(len(disk_list
), 1)
3259 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(self
):
3260 """Find persistent root volume, always selects the first vsd as root volume."""
3261 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3262 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3263 "persistent-volume2",
3264 "persistent-root-volume",
3267 target_vdu
= target_vdu_wth_persistent_storage
3268 vdu_instantiation_volumes_list
= []
3270 expected_persist_root_disk
= {
3271 "persistent-volume2": {
3272 "image_id": "ubuntu20.04",
3276 expected_disk_list
= [
3278 "image_id": "ubuntu20.04",
3282 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3283 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3285 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3286 self
.assertEqual(disk_list
, expected_disk_list
)
3287 self
.assertEqual(len(disk_list
), 1)
3289 def test_find_persistent_root_volumes_empty_size_of_storage(self
):
3290 """Find persistent root volume, size of storage is empty."""
3291 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3292 vnfd
["virtual-storage-desc"][0]["size-of-storage"] = ""
3293 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3294 "persistent-volume2",
3295 "persistent-root-volume",
3298 target_vdu
= target_vdu_wth_persistent_storage
3299 vdu_instantiation_volumes_list
= []
3301 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3302 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3304 self
.assertEqual(persist_root_disk
, None)
3305 self
.assertEqual(disk_list
, [])
3307 def test_find_persistent_root_empty_disk_list(self
):
3308 """Find persistent root volume, empty disk list."""
3309 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3310 target_vdu
= target_vdu_wth_persistent_storage
3311 vdu_instantiation_volumes_list
= []
3313 expected_persist_root_disk
= {
3314 "persistent-root-volume": {
3315 "image_id": "ubuntu20.04",
3319 expected_disk_list
= [
3321 "image_id": "ubuntu20.04",
3325 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3326 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3328 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3329 self
.assertEqual(disk_list
, expected_disk_list
)
3330 self
.assertEqual(len(disk_list
), 1)
3332 def test_find_persistent_root_volumes_target_vdu_mismatch(self
):
3333 """Find persistent root volume, target vdu name is not matching."""
3334 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3335 vnfd
["vdu"][0]["name"] = "Several_Volumes-VM"
3336 target_vdu
= target_vdu_wth_persistent_storage
3337 vdu_instantiation_volumes_list
= []
3339 result
= self
.ns
.find_persistent_root_volumes(
3340 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3342 self
.assertEqual(result
, None)
3343 self
.assertEqual(disk_list
, [])
3344 self
.assertEqual(len(disk_list
), 0)
3346 def test_find_persistent_root_volumes_with_instantiation_vol_list(self
):
3347 """Find persistent root volume, existing volume needs to be used."""
3348 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3349 target_vdu
= target_vdu_wth_persistent_storage
3350 vdu_instantiation_volumes_list
= [
3352 "vim-volume-id": vim_volume_id
,
3353 "name": "persistent-root-volume",
3357 expected_persist_root_disk
= {
3358 "persistent-root-volume": {
3359 "vim_volume_id": vim_volume_id
,
3360 "image_id": "ubuntu20.04",
3363 expected_disk_list
= [
3365 "vim_volume_id": vim_volume_id
,
3366 "image_id": "ubuntu20.04",
3369 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3370 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3372 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3373 self
.assertEqual(disk_list
, expected_disk_list
)
3374 self
.assertEqual(len(disk_list
), 1)
3376 def test_find_persistent_root_volumes_invalid_instantiation_params(self
):
3377 """Find persistent root volume, existing volume id keyword is invalid."""
3378 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3379 target_vdu
= target_vdu_wth_persistent_storage
3380 vdu_instantiation_volumes_list
= [
3382 "volume-id": vim_volume_id
,
3383 "name": "persistent-root-volume",
3387 with self
.assertRaises(KeyError):
3388 self
.ns
.find_persistent_root_volumes(
3389 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3392 self
.assertEqual(disk_list
, [])
3393 self
.assertEqual(len(disk_list
), 0)
3395 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3398 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3399 persistent_root_disk
= {
3400 "persistent-root-volume": {
3401 "image_id": "ubuntu20.04",
3406 target_vdu
= target_vdu_wth_persistent_storage
3407 vdu_instantiation_volumes_list
= []
3410 "image_id": "ubuntu20.04",
3415 expected_disk_list
= [
3417 "image_id": "ubuntu20.04",
3424 self
.ns
.find_persistent_volumes(
3425 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3427 self
.assertEqual(disk_list
, expected_disk_list
)
3429 def test_find_persistent_volumes_vdu_wth_inst_vol_list(self
):
3430 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3431 persistent_root_disk
= {
3432 "persistent-root-volume": {
3433 "image_id": "ubuntu20.04",
3437 vdu_instantiation_volumes_list
= [
3439 "vim-volume-id": vim_volume_id
,
3440 "name": "persistent-volume2",
3443 target_vdu
= target_vdu_wth_persistent_storage
3446 "image_id": "ubuntu20.04",
3450 expected_disk_list
= [
3452 "image_id": "ubuntu20.04",
3456 "vim_volume_id": vim_volume_id
,
3459 self
.ns
.find_persistent_volumes(
3460 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3462 self
.assertEqual(disk_list
, expected_disk_list
)
3464 def test_find_persistent_volumes_vdu_wthout_persistent_storage(self
):
3465 """Find persistent ordinary volume, there is not any persistent disk."""
3466 persistent_root_disk
= {}
3467 vdu_instantiation_volumes_list
= []
3468 target_vdu
= target_vdu_wthout_persistent_storage
3470 self
.ns
.find_persistent_volumes(
3471 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3473 self
.assertEqual(disk_list
, disk_list
)
3475 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3478 """There is persistent root disk, but there is not ordinary persistent disk."""
3479 persistent_root_disk
= {
3480 "persistent-root-volume": {
3481 "image_id": "ubuntu20.04",
3485 vdu_instantiation_volumes_list
= []
3486 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3487 target_vdu
["virtual-storages"] = [
3489 "id": "persistent-root-volume",
3490 "size-of-storage": "10",
3491 "type-of-storage": "persistent-storage:persistent-storage",
3494 "id": "ephemeral-volume",
3495 "size-of-storage": "1",
3496 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3501 "image_id": "ubuntu20.04",
3505 self
.ns
.find_persistent_volumes(
3506 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3508 self
.assertEqual(disk_list
, disk_list
)
3510 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(self
):
3511 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3512 vim-volume-id is given as instantiation parameter but disk id is not matching.
3514 vdu_instantiation_volumes_list
= [
3516 "vim-volume-id": vim_volume_id
,
3517 "name": "persistent-volume3",
3520 persistent_root_disk
= {
3521 "persistent-root-volume": {
3522 "image_id": "ubuntu20.04",
3528 "image_id": "ubuntu20.04",
3532 expected_disk_list
= [
3534 "image_id": "ubuntu20.04",
3542 target_vdu
= target_vdu_wth_persistent_storage
3543 self
.ns
.find_persistent_volumes(
3544 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3546 self
.assertEqual(disk_list
, expected_disk_list
)
3548 def test_sort_vdu_interfaces_position_all_wth_positions(self
):
3549 """Interfaces are sorted according to position, all have positions."""
3550 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3551 target_vdu
["interfaces"] = [
3554 "ns-vld-id": "datanet",
3559 "ns-vld-id": "mgmtnet",
3563 sorted_interfaces
= [
3566 "ns-vld-id": "mgmtnet",
3571 "ns-vld-id": "datanet",
3575 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3576 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3578 def test_sort_vdu_interfaces_position_some_wth_position(self
):
3579 """Interfaces are sorted according to position, some of them have positions."""
3580 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3581 target_vdu
["interfaces"] = [
3584 "ns-vld-id": "mgmtnet",
3588 "ns-vld-id": "datanet",
3592 sorted_interfaces
= [
3595 "ns-vld-id": "datanet",
3600 "ns-vld-id": "mgmtnet",
3603 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3604 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3606 def test_sort_vdu_interfaces_position_empty_interface_list(self
):
3607 """Interface list is empty."""
3608 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3609 target_vdu
["interfaces"] = []
3610 sorted_interfaces
= []
3611 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3612 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3614 def test_partially_locate_vdu_interfaces(self
):
3615 """Some interfaces have positions."""
3616 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3617 target_vdu
["interfaces"] = [
3620 "ns-vld-id": "net1",
3622 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3625 "ns-vld-id": "mgmtnet",
3629 "ns-vld-id": "datanet",
3633 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3634 self
.assertDictEqual(
3635 target_vdu
["interfaces"][0],
3638 "ns-vld-id": "datanet",
3642 self
.assertDictEqual(
3643 target_vdu
["interfaces"][2],
3644 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3647 def test_partially_locate_vdu_interfaces_position_start_from_0(self
):
3648 """Some interfaces have positions, position start from 0."""
3649 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3650 target_vdu
["interfaces"] = [
3653 "ns-vld-id": "net1",
3655 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3658 "ns-vld-id": "mgmtnet",
3662 "ns-vld-id": "datanet",
3666 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3667 self
.assertDictEqual(
3668 target_vdu
["interfaces"][0],
3671 "ns-vld-id": "datanet",
3675 self
.assertDictEqual(
3676 target_vdu
["interfaces"][3],
3677 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3680 def test_partially_locate_vdu_interfaces_wthout_position(self
):
3681 """Interfaces do not have positions."""
3682 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3683 target_vdu
["interfaces"] = interfaces_wthout_positions
3684 expected_result
= deepcopy(target_vdu
["interfaces"])
3685 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3686 self
.assertEqual(target_vdu
["interfaces"], expected_result
)
3688 def test_partially_locate_vdu_interfaces_all_has_position(self
):
3689 """All interfaces have position."""
3690 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3691 target_vdu
["interfaces"] = interfaces_wth_all_positions
3692 expected_interfaces
= [
3695 "ns-vld-id": "net2",
3700 "ns-vld-id": "mgmtnet",
3705 "ns-vld-id": "net1",
3709 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3710 self
.assertEqual(target_vdu
["interfaces"], expected_interfaces
)
3712 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3713 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3714 def test_prepare_vdu_cloud_init(self
, mock_parse_jinja2
, mock_get_cloud_init
):
3715 """Target_vdu has cloud-init and boot-data-drive."""
3716 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3717 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3718 target_vdu
["boot-data-drive"] = "vda"
3720 mock_get_cloud_init
.return_value
= cloud_init_content
3721 mock_parse_jinja2
.return_value
= user_data
3723 "user-data": user_data
,
3724 "boot-data-drive": "vda",
3726 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3727 self
.assertDictEqual(result
, expected_result
)
3728 mock_get_cloud_init
.assert_called_once_with(
3729 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3731 mock_parse_jinja2
.assert_called_once_with(
3732 cloud_init_content
=cloud_init_content
,
3734 context
="sample-cloud-init-path",
3737 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3738 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3739 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3740 self
, mock_parse_jinja2
, mock_get_cloud_init
3742 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3743 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3744 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3745 target_vdu
["boot-data-drive"] = "vda"
3747 mock_get_cloud_init
.side_effect
= NsException(
3748 "Mismatch descriptor for cloud init."
3751 with self
.assertRaises(NsException
) as err
:
3752 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3753 self
.assertEqual(str(err
.exception
), "Mismatch descriptor for cloud init.")
3755 mock_get_cloud_init
.assert_called_once_with(
3756 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3758 mock_parse_jinja2
.assert_not_called()
3760 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3761 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3762 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3763 self
, mock_parse_jinja2
, mock_get_cloud_init
3765 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3766 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3767 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3768 target_vdu
["boot-data-drive"] = "vda"
3770 mock_get_cloud_init
.return_value
= cloud_init_content
3771 mock_parse_jinja2
.side_effect
= NsException("Error parsing cloud-init content.")
3773 with self
.assertRaises(NsException
) as err
:
3774 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3775 self
.assertEqual(str(err
.exception
), "Error parsing cloud-init content.")
3776 mock_get_cloud_init
.assert_called_once_with(
3777 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3779 mock_parse_jinja2
.assert_called_once_with(
3780 cloud_init_content
=cloud_init_content
,
3782 context
="sample-cloud-init-path",
3785 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3786 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3787 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
3788 self
, mock_parse_jinja2
, mock_get_cloud_init
3790 """Target_vdu has cloud-init but do not have boot-data-drive."""
3791 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3792 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3794 mock_get_cloud_init
.return_value
= cloud_init_content
3795 mock_parse_jinja2
.return_value
= user_data
3797 "user-data": user_data
,
3799 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3800 self
.assertDictEqual(result
, expected_result
)
3801 mock_get_cloud_init
.assert_called_once_with(
3802 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3804 mock_parse_jinja2
.assert_called_once_with(
3805 cloud_init_content
=cloud_init_content
,
3807 context
="sample-cloud-init-path",
3810 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3811 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3812 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
3813 self
, mock_parse_jinja2
, mock_get_cloud_init
3815 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
3816 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3817 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3818 target_vdu
["boot-data-drive"] = "vda"
3819 vdu2cloud_init
= {"sample-cloud-init-path": cloud_init_content
}
3820 mock_parse_jinja2
.return_value
= user_data
3822 "user-data": user_data
,
3823 "boot-data-drive": "vda",
3825 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3826 self
.assertDictEqual(result
, expected_result
)
3827 mock_get_cloud_init
.assert_not_called()
3828 mock_parse_jinja2
.assert_called_once_with(
3829 cloud_init_content
=cloud_init_content
,
3831 context
="sample-cloud-init-path",
3834 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3835 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3836 def test_prepare_vdu_cloud_init_no_cloud_init(
3837 self
, mock_parse_jinja2
, mock_get_cloud_init
3839 """Target_vdu do not have cloud-init."""
3840 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3841 target_vdu
["boot-data-drive"] = "vda"
3844 "boot-data-drive": "vda",
3846 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3847 self
.assertDictEqual(result
, expected_result
)
3848 mock_get_cloud_init
.assert_not_called()
3849 mock_parse_jinja2
.assert_not_called()
3851 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self
):
3852 """ns_vld and vnf_vld both exist."""
3855 "ns-vld-id": "mgmtnet",
3856 "vnf-vld-id": "mgmt_cp_int",
3858 expected_result
= f
"{ns_preffix}:vld.mgmtnet"
3859 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3860 interface
, ns_preffix
, vnf_preffix
3862 self
.assertEqual(result
, expected_result
)
3864 def test_check_vld_information_of_interfaces_empty_interfaces(self
):
3865 """Interface dict is empty."""
3867 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3868 interface
, ns_preffix
, vnf_preffix
3870 self
.assertEqual(result
, "")
3872 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self
):
3873 """Interface dict has only vnf_vld."""
3876 "vnf-vld-id": "mgmt_cp_int",
3878 expected_result
= f
"{vnf_preffix}:vld.mgmt_cp_int"
3879 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3880 interface
, ns_preffix
, vnf_preffix
3882 self
.assertEqual(result
, expected_result
)
3884 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
3887 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
3890 "vnf-vld-id": "mgmt_cp_int",
3893 with self
.assertRaises(Exception) as err
:
3894 self
.ns
._check
_vld
_information
_of
_interfaces
(
3895 interface
, ns_preffix
, vnf_preffix
3897 self
.assertEqual(type(err
), TypeError)
3899 def test_prepare_interface_port_security_has_security_details(self
):
3900 """Interface dict has port security details."""
3903 "ns-vld-id": "mgmtnet",
3904 "vnf-vld-id": "mgmt_cp_int",
3905 "port-security-enabled": True,
3906 "port-security-disable-strategy": "allow-address-pairs",
3908 expected_interface
= {
3910 "ns-vld-id": "mgmtnet",
3911 "vnf-vld-id": "mgmt_cp_int",
3912 "port_security": True,
3913 "port_security_disable_strategy": "allow-address-pairs",
3915 self
.ns
._prepare
_interface
_port
_security
(interface
)
3916 self
.assertDictEqual(interface
, expected_interface
)
3918 def test_prepare_interface_port_security_empty_interfaces(self
):
3919 """Interface dict is empty."""
3921 expected_interface
= {}
3922 self
.ns
._prepare
_interface
_port
_security
(interface
)
3923 self
.assertDictEqual(interface
, expected_interface
)
3925 def test_prepare_interface_port_security_wthout_port_security(self
):
3926 """Interface dict does not have port security details."""
3929 "ns-vld-id": "mgmtnet",
3930 "vnf-vld-id": "mgmt_cp_int",
3932 expected_interface
= {
3934 "ns-vld-id": "mgmtnet",
3935 "vnf-vld-id": "mgmt_cp_int",
3937 self
.ns
._prepare
_interface
_port
_security
(interface
)
3938 self
.assertDictEqual(interface
, expected_interface
)
3940 def test_create_net_item_of_interface_floating_ip_port_security(self
):
3941 """Interface dict has floating ip, port-security details."""
3944 "vcpi": "sample_vcpi",
3945 "port_security": True,
3946 "port_security_disable_strategy": "allow-address-pairs",
3947 "floating_ip": "10.1.1.12",
3948 "ns-vld-id": "mgmtnet",
3949 "vnf-vld-id": "mgmt_cp_int",
3951 net_text
= f
"{ns_preffix}"
3952 expected_net_item
= {
3954 "port_security": True,
3955 "port_security_disable_strategy": "allow-address-pairs",
3956 "floating_ip": "10.1.1.12",
3957 "net_id": f
"TASK-{ns_preffix}",
3960 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
3961 self
.assertDictEqual(result
, expected_net_item
)
3963 def test_create_net_item_of_interface_invalid_net_text(self
):
3964 """net-text is invalid."""
3967 "vcpi": "sample_vcpi",
3968 "port_security": True,
3969 "port_security_disable_strategy": "allow-address-pairs",
3970 "floating_ip": "10.1.1.12",
3971 "ns-vld-id": "mgmtnet",
3972 "vnf-vld-id": "mgmt_cp_int",
3975 with self
.assertRaises(TypeError):
3976 self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
3978 def test_create_net_item_of_interface_empty_interface(self
):
3979 """Interface dict is empty."""
3981 net_text
= ns_preffix
3982 expected_net_item
= {
3983 "net_id": f
"TASK-{ns_preffix}",
3986 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
3987 self
.assertDictEqual(result
, expected_net_item
)
3989 @patch("osm_ng_ro.ns.deep_get")
3990 def test_prepare_type_of_interface_type_sriov(self
, mock_deep_get
):
3991 """Interface type is SR-IOV."""
3994 "vcpi": "sample_vcpi",
3995 "port_security": True,
3996 "port_security_disable_strategy": "allow-address-pairs",
3997 "floating_ip": "10.1.1.12",
3998 "ns-vld-id": "mgmtnet",
3999 "vnf-vld-id": "mgmt_cp_int",
4002 mock_deep_get
.return_value
= "SR-IOV"
4003 net_text
= ns_preffix
4005 expected_net_item
= {
4010 self
.ns
._prepare
_type
_of
_interface
(
4011 interface
, tasks_by_target_record_id
, net_text
, net_item
4013 self
.assertDictEqual(net_item
, expected_net_item
)
4016 tasks_by_target_record_id
[net_text
]["extra_dict"]["params"]["net_type"],
4018 mock_deep_get
.assert_called_once_with(
4019 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4022 @patch("osm_ng_ro.ns.deep_get")
4023 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
4026 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
4029 "vcpi": "sample_vcpi",
4030 "port_security": True,
4031 "port_security_disable_strategy": "allow-address-pairs",
4032 "floating_ip": "10.1.1.12",
4033 "ns-vld-id": "mgmtnet",
4034 "vnf-vld-id": "mgmt_cp_int",
4035 "type": "PCI-PASSTHROUGH",
4037 mock_deep_get
.return_value
= {}
4038 tasks_by_target_record_id
= {}
4039 net_text
= ns_preffix
4041 expected_net_item
= {
4043 "model": "PCI-PASSTHROUGH",
4044 "type": "PCI-PASSTHROUGH",
4046 self
.ns
._prepare
_type
_of
_interface
(
4047 interface
, tasks_by_target_record_id
, net_text
, net_item
4049 self
.assertDictEqual(net_item
, expected_net_item
)
4050 mock_deep_get
.assert_called_once_with(
4051 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4054 @patch("osm_ng_ro.ns.deep_get")
4055 def test_prepare_type_of_interface_type_mgmt(self
, mock_deep_get
):
4056 """Interface type is mgmt."""
4059 "vcpi": "sample_vcpi",
4060 "port_security": True,
4061 "port_security_disable_strategy": "allow-address-pairs",
4062 "floating_ip": "10.1.1.12",
4063 "ns-vld-id": "mgmtnet",
4064 "vnf-vld-id": "mgmt_cp_int",
4067 tasks_by_target_record_id
= {}
4068 net_text
= ns_preffix
4070 expected_net_item
= {
4073 self
.ns
._prepare
_type
_of
_interface
(
4074 interface
, tasks_by_target_record_id
, net_text
, net_item
4076 self
.assertDictEqual(net_item
, expected_net_item
)
4077 mock_deep_get
.assert_not_called()
4079 @patch("osm_ng_ro.ns.deep_get")
4080 def test_prepare_type_of_interface_type_bridge(self
, mock_deep_get
):
4081 """Interface type is bridge."""
4084 "vcpi": "sample_vcpi",
4085 "port_security": True,
4086 "port_security_disable_strategy": "allow-address-pairs",
4087 "floating_ip": "10.1.1.12",
4088 "ns-vld-id": "mgmtnet",
4089 "vnf-vld-id": "mgmt_cp_int",
4091 tasks_by_target_record_id
= {}
4092 net_text
= ns_preffix
4094 expected_net_item
= {
4098 self
.ns
._prepare
_type
_of
_interface
(
4099 interface
, tasks_by_target_record_id
, net_text
, net_item
4101 self
.assertDictEqual(net_item
, expected_net_item
)
4102 mock_deep_get
.assert_not_called()
4104 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4105 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4106 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4107 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4108 def test_prepare_vdu_interfaces(
4110 mock_type_of_interface
,
4111 mock_item_of_interface
,
4113 mock_vld_information_of_interface
,
4115 """Prepare vdu interfaces successfully."""
4116 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4119 "ns-vld-id": "net1",
4120 "ip-address": "13.2.12.31",
4121 "mgmt-interface": True,
4125 "vnf-vld-id": "net2",
4126 "mac-address": "d0:94:66:ed:fc:e2",
4130 "ns-vld-id": "mgmtnet",
4132 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4134 "params": "test_params",
4135 "find_params": "test_find_params",
4139 net_text_1
= f
"{ns_preffix}:net1"
4140 net_text_2
= f
"{vnf_preffix}:net2"
4141 net_text_3
= f
"{ns_preffix}:mgmtnet"
4144 "net_id": f
"TASK-{ns_preffix}",
4149 "net_id": f
"TASK-{ns_preffix}",
4154 "net_id": f
"TASK-{ns_preffix}",
4157 mock_item_of_interface
.side_effect
= [net_item_1
, net_item_2
, net_item_3
]
4158 mock_vld_information_of_interface
.side_effect
= [
4164 expected_extra_dict
= {
4165 "params": "test_params",
4166 "find_params": "test_find_params",
4167 "depends_on": [net_text_1
, net_text_2
, net_text_3
],
4168 "mgmt_vdu_interface": 0,
4170 updated_net_item1
= deepcopy(net_item_1
)
4171 updated_net_item1
.update({"ip_address": "13.2.12.31"})
4172 updated_net_item2
= deepcopy(net_item_2
)
4173 updated_net_item2
.update({"mac_address": "d0:94:66:ed:fc:e2"})
4174 expected_net_list
= [updated_net_item1
, updated_net_item2
, net_item_3
]
4175 self
.ns
._prepare
_vdu
_interfaces
(
4181 tasks_by_target_record_id
,
4184 _call_mock_vld_information_of_interface
= (
4185 mock_vld_information_of_interface
.call_args_list
4188 _call_mock_vld_information_of_interface
[0][0],
4189 (interface_1
, ns_preffix
, vnf_preffix
),
4192 _call_mock_vld_information_of_interface
[1][0],
4193 (interface_2
, ns_preffix
, vnf_preffix
),
4196 _call_mock_vld_information_of_interface
[2][0],
4197 (interface_3
, ns_preffix
, vnf_preffix
),
4200 _call_mock_port_security
= mock_port_security
.call_args_list
4201 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4202 self
.assertEqual(_call_mock_port_security
[1].args
[0], interface_2
)
4203 self
.assertEqual(_call_mock_port_security
[2].args
[0], interface_3
)
4205 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4206 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4207 self
.assertEqual(_call_mock_item_of_interface
[1][0], (interface_2
, net_text_2
))
4208 self
.assertEqual(_call_mock_item_of_interface
[2][0], (interface_3
, net_text_3
))
4210 _call_mock_type_of_interface
= mock_type_of_interface
.call_args_list
4212 _call_mock_type_of_interface
[0][0],
4213 (interface_1
, tasks_by_target_record_id
, net_text_1
, net_item_1
),
4216 _call_mock_type_of_interface
[1][0],
4217 (interface_2
, tasks_by_target_record_id
, net_text_2
, net_item_2
),
4220 _call_mock_type_of_interface
[2][0],
4221 (interface_3
, tasks_by_target_record_id
, net_text_3
, net_item_3
),
4223 self
.assertEqual(net_list
, expected_net_list
)
4224 self
.assertEqual(extra_dict
, expected_extra_dict
)
4225 self
.logger
.error
.assert_not_called()
4227 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4228 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4229 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4230 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4231 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4233 mock_type_of_interface
,
4234 mock_item_of_interface
,
4236 mock_vld_information_of_interface
,
4238 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4239 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4242 "ns-vld-id": "net1",
4243 "ip-address": "13.2.12.31",
4244 "mgmt-interface": True,
4248 "vnf-vld-id": "net2",
4249 "mac-address": "d0:94:66:ed:fc:e2",
4253 "ns-vld-id": "mgmtnet",
4255 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4257 "params": "test_params",
4258 "find_params": "test_find_params",
4261 net_text_1
= f
"{ns_preffix}:net1"
4262 mock_item_of_interface
.side_effect
= [TypeError, TypeError, TypeError]
4264 mock_vld_information_of_interface
.side_effect
= [net_text_1
]
4266 expected_extra_dict
= {
4267 "params": "test_params",
4268 "find_params": "test_find_params",
4269 "depends_on": [net_text_1
],
4271 with self
.assertRaises(TypeError):
4272 self
.ns
._prepare
_vdu
_interfaces
(
4278 tasks_by_target_record_id
,
4282 _call_mock_vld_information_of_interface
= (
4283 mock_vld_information_of_interface
.call_args_list
4286 _call_mock_vld_information_of_interface
[0][0],
4287 (interface_1
, ns_preffix
, vnf_preffix
),
4290 _call_mock_port_security
= mock_port_security
.call_args_list
4291 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4293 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4294 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4296 mock_type_of_interface
.assert_not_called()
4297 self
.logger
.error
.assert_not_called()
4298 self
.assertEqual(net_list
, [])
4299 self
.assertEqual(extra_dict
, expected_extra_dict
)
4301 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4302 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4303 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4304 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4305 def test_prepare_vdu_interfaces_vld_information_is_empty(
4307 mock_type_of_interface
,
4308 mock_item_of_interface
,
4310 mock_vld_information_of_interface
,
4312 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4313 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4316 "ns-vld-id": "net1",
4317 "ip-address": "13.2.12.31",
4318 "mgmt-interface": True,
4322 "vnf-vld-id": "net2",
4323 "mac-address": "d0:94:66:ed:fc:e2",
4327 "ns-vld-id": "mgmtnet",
4329 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4331 "params": "test_params",
4332 "find_params": "test_find_params",
4335 mock_vld_information_of_interface
.side_effect
= ["", "", ""]
4337 self
.ns
._prepare
_vdu
_interfaces
(
4343 tasks_by_target_record_id
,
4347 _call_mock_vld_information_of_interface
= (
4348 mock_vld_information_of_interface
.call_args_list
4351 _call_mock_vld_information_of_interface
[0][0],
4352 (interface_1
, ns_preffix
, vnf_preffix
),
4355 _call_mock_vld_information_of_interface
[1][0],
4356 (interface_2
, ns_preffix
, vnf_preffix
),
4359 _call_mock_vld_information_of_interface
[2][0],
4360 (interface_3
, ns_preffix
, vnf_preffix
),
4363 _call_logger
= self
.logger
.error
.call_args_list
4366 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4370 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4374 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4376 self
.assertEqual(net_list
, [])
4380 "params": "test_params",
4381 "find_params": "test_find_params",
4386 mock_item_of_interface
.assert_not_called()
4387 mock_port_security
.assert_not_called()
4388 mock_type_of_interface
.assert_not_called()
4390 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4391 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4392 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4393 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4394 def test_prepare_vdu_interfaces_empty_interface_list(
4396 mock_type_of_interface
,
4397 mock_item_of_interface
,
4399 mock_vld_information_of_interface
,
4401 """Prepare vdu interfaces, interface list is empty."""
4402 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4403 target_vdu
["interfaces"] = []
4406 self
.ns
._prepare
_vdu
_interfaces
(
4412 tasks_by_target_record_id
,
4415 mock_type_of_interface
.assert_not_called()
4416 mock_vld_information_of_interface
.assert_not_called()
4417 mock_item_of_interface
.assert_not_called()
4418 mock_port_security
.assert_not_called()
4420 def test_prepare_vdu_ssh_keys(self
):
4421 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4422 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4423 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4424 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4425 target_vdu
["ssh-access-required"] = True
4427 expected_cloud_config
= {
4428 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4430 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4431 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4433 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self
):
4434 """Target_vdu does not have ssh-keys."""
4435 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4436 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4437 target_vdu
["ssh-access-required"] = True
4439 expected_cloud_config
= {"key-pairs": [{"public_key": "path_of_public_key"}]}
4440 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4441 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4443 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self
):
4444 """Target_vdu has ssh-keys, ssh-access is not required."""
4445 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4446 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4447 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4448 target_vdu
["ssh-access-required"] = False
4450 expected_cloud_config
= {"key-pairs": ["sample-ssh-key"]}
4451 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4452 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4454 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4455 def test_add_persistent_root_disk_to_disk_list(
4456 self
, mock_select_persistent_root_disk
4458 """Add persistent root disk to disk_list"""
4460 "id": "persistent-root-volume",
4461 "type-of-storage": "persistent-storage:persistent-storage",
4462 "size-of-storage": "10",
4464 mock_select_persistent_root_disk
.return_value
= root_disk
4465 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4466 vnfd
["virtual-storage-desc"][1] = root_disk
4467 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4468 persistent_root_disk
= {}
4470 expected_disk_list
= [
4472 "image_id": "ubuntu20.04",
4476 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4477 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4479 self
.assertEqual(disk_list
, expected_disk_list
)
4480 mock_select_persistent_root_disk
.assert_called_once()
4482 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4483 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4484 self
, mock_select_persistent_root_disk
4486 """Add persistent root disk to disk_list"""
4488 "id": "persistent-root-volume",
4489 "type-of-storage": "persistent-storage:persistent-storage",
4490 "size-of-storage": "10",
4492 mock_select_persistent_root_disk
.side_effect
= AttributeError
4493 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4494 vnfd
["virtual-storage-desc"][1] = root_disk
4495 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4496 persistent_root_disk
= {}
4498 with self
.assertRaises(AttributeError):
4499 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4500 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4502 self
.assertEqual(disk_list
, [])
4503 mock_select_persistent_root_disk
.assert_called_once()
4505 def test_add_persistent_ordinary_disk_to_disk_list(self
):
4506 """Add persistent ordinary disk to disk_list"""
4507 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4508 persistent_root_disk
= {
4509 "persistent-root-volume": {
4510 "image_id": "ubuntu20.04",
4514 persistent_ordinary_disk
= {}
4516 expected_disk_list
= [
4521 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4522 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4524 self
.assertEqual(disk_list
, expected_disk_list
)
4526 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(self
):
4527 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4528 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4529 persistent_root_disk
= {
4530 "persistent-root-volume": {
4531 "image_id": "ubuntu20.04",
4534 "persistent-volume2": {
4538 persistent_ordinary_disk
= {}
4541 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4542 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4544 self
.assertEqual(disk_list
, [])
4546 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4547 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4548 self
, mock_select_persistent_root_disk
4550 """VNFD does not have persistent storage."""
4551 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4552 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4553 mock_select_persistent_root_disk
.return_value
= None
4554 persistent_root_disk
= {}
4556 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4557 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4559 self
.assertEqual(disk_list
, [])
4560 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4562 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4563 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4564 self
, mock_select_persistent_root_disk
4566 """Persistent_root_disk dict is empty."""
4567 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4568 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4569 mock_select_persistent_root_disk
.return_value
= None
4570 persistent_root_disk
= {}
4572 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4573 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4575 self
.assertEqual(disk_list
, [])
4576 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4578 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self
):
4579 """Invalid extra dict."""
4580 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4581 target_vdu
["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4583 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4584 with self
.assertRaises(NsException
) as err
:
4585 self
.ns
._prepare
_vdu
_affinity
_group
_list
(target_vdu
, extra_dict
, ns_preffix
)
4586 self
.assertEqual(str(err
.exception
), "Invalid extra_dict format.")
4588 def test_prepare_vdu_affinity_group_list_one_affinity_group(self
):
4589 """There is one affinity-group."""
4590 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4591 target_vdu
["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4592 extra_dict
= {"depends_on": []}
4593 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4594 affinity_group_txt
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4595 expected_result
= [{"affinity_group_id": "TASK-" + affinity_group_txt
}]
4596 expected_extra_dict
= {"depends_on": [affinity_group_txt
]}
4597 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4598 target_vdu
, extra_dict
, ns_preffix
4600 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4601 self
.assertEqual(result
, expected_result
)
4603 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self
):
4604 """There are two affinity-groups."""
4605 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4606 target_vdu
["affinity-or-anti-affinity-group-id"] = [
4607 "affinity-group-id1",
4608 "affinity-group-id2",
4610 extra_dict
= {"depends_on": []}
4611 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4612 affinity_group_txt1
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4613 affinity_group_txt2
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4615 {"affinity_group_id": "TASK-" + affinity_group_txt1
},
4616 {"affinity_group_id": "TASK-" + affinity_group_txt2
},
4618 expected_extra_dict
= {"depends_on": [affinity_group_txt1
, affinity_group_txt2
]}
4619 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4620 target_vdu
, extra_dict
, ns_preffix
4622 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4623 self
.assertEqual(result
, expected_result
)
4625 def test_prepare_vdu_affinity_group_list_no_affinity_group(self
):
4626 """There is not any affinity-group."""
4627 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4628 extra_dict
= {"depends_on": []}
4629 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4630 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4631 target_vdu
, extra_dict
, ns_preffix
4633 self
.assertDictEqual(extra_dict
, {"depends_on": []})
4634 self
.assertEqual(result
, [])
4636 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4637 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4638 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4639 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4640 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4641 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4642 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4643 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4644 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4645 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4646 def test_process_vdu_params_with_inst_vol_list(
4648 mock_prepare_vdu_affinity_group_list
,
4649 mock_add_persistent_ordinary_disks_to_disk_list
,
4650 mock_add_persistent_root_disk_to_disk_list
,
4651 mock_find_persistent_volumes
,
4652 mock_find_persistent_root_volumes
,
4653 mock_prepare_vdu_ssh_keys
,
4654 mock_prepare_vdu_cloud_init
,
4655 mock_prepare_vdu_interfaces
,
4656 mock_locate_vdu_interfaces
,
4657 mock_sort_vdu_interfaces
,
4659 """Instantiation volume list is empty."""
4660 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4662 target_vdu
["interfaces"] = interfaces_wth_all_positions
4664 vdu_instantiation_vol_list
= [
4666 "vim-volume-id": vim_volume_id
,
4667 "name": "persistent-volume2",
4670 target_vdu
["additionalParams"] = {
4671 "OSM": {"vdu_volumes": vdu_instantiation_vol_list
}
4673 mock_prepare_vdu_cloud_init
.return_value
= {}
4674 mock_prepare_vdu_affinity_group_list
.return_value
= []
4675 persistent_root_disk
= {
4676 "persistent-root-volume": {
4677 "image_id": "ubuntu20.04",
4681 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
4683 new_kwargs
= deepcopy(kwargs
)
4688 "tasks_by_target_record_id": {},
4692 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
4693 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4694 db
.get_one
.return_value
= vnfd
4695 result
= Ns
._process
_vdu
_params
(
4696 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4698 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4699 mock_locate_vdu_interfaces
.assert_not_called()
4700 mock_prepare_vdu_cloud_init
.assert_called_once()
4701 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
4702 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
4703 mock_prepare_vdu_interfaces
.assert_called_once_with(
4705 expected_extra_dict_copy
,
4712 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4713 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4714 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4715 mock_find_persistent_volumes
.assert_called_once_with(
4716 persistent_root_disk
, target_vdu
, vdu_instantiation_vol_list
, []
4719 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4720 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4721 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4722 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4723 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4724 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4725 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4726 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4727 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4728 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4729 def test_process_vdu_params_wth_affinity_groups(
4731 mock_prepare_vdu_affinity_group_list
,
4732 mock_add_persistent_ordinary_disks_to_disk_list
,
4733 mock_add_persistent_root_disk_to_disk_list
,
4734 mock_find_persistent_volumes
,
4735 mock_find_persistent_root_volumes
,
4736 mock_prepare_vdu_ssh_keys
,
4737 mock_prepare_vdu_cloud_init
,
4738 mock_prepare_vdu_interfaces
,
4739 mock_locate_vdu_interfaces
,
4740 mock_sort_vdu_interfaces
,
4742 """There is cloud-config."""
4743 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4746 target_vdu
["interfaces"] = interfaces_wth_all_positions
4747 mock_prepare_vdu_cloud_init
.return_value
= {}
4748 mock_prepare_vdu_affinity_group_list
.return_value
= [
4753 new_kwargs
= deepcopy(kwargs
)
4758 "tasks_by_target_record_id": {},
4762 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
4763 expected_extra_dict3
["params"]["affinity_group_list"] = [
4767 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4768 db
.get_one
.return_value
= vnfd
4769 result
= Ns
._process
_vdu
_params
(
4770 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4772 self
.assertDictEqual(result
, expected_extra_dict3
)
4773 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4774 mock_locate_vdu_interfaces
.assert_not_called()
4775 mock_prepare_vdu_cloud_init
.assert_called_once()
4776 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4777 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4778 mock_prepare_vdu_interfaces
.assert_called_once_with(
4780 expected_extra_dict3
,
4788 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4789 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4790 mock_find_persistent_volumes
.assert_not_called()
4792 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4793 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4794 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4795 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4796 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4797 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4798 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4799 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4800 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4801 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4802 def test_process_vdu_params_wth_cloud_config(
4804 mock_prepare_vdu_affinity_group_list
,
4805 mock_add_persistent_ordinary_disks_to_disk_list
,
4806 mock_add_persistent_root_disk_to_disk_list
,
4807 mock_find_persistent_volumes
,
4808 mock_find_persistent_root_volumes
,
4809 mock_prepare_vdu_ssh_keys
,
4810 mock_prepare_vdu_cloud_init
,
4811 mock_prepare_vdu_interfaces
,
4812 mock_locate_vdu_interfaces
,
4813 mock_sort_vdu_interfaces
,
4815 """There is cloud-config."""
4816 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4819 target_vdu
["interfaces"] = interfaces_wth_all_positions
4820 mock_prepare_vdu_cloud_init
.return_value
= {
4821 "user-data": user_data
,
4822 "boot-data-drive": "vda",
4824 mock_prepare_vdu_affinity_group_list
.return_value
= []
4826 new_kwargs
= deepcopy(kwargs
)
4831 "tasks_by_target_record_id": {},
4835 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
4836 expected_extra_dict3
["params"]["cloud_config"] = {
4837 "user-data": user_data
,
4838 "boot-data-drive": "vda",
4840 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4841 db
.get_one
.return_value
= vnfd
4842 result
= Ns
._process
_vdu
_params
(
4843 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4845 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4846 mock_locate_vdu_interfaces
.assert_not_called()
4847 mock_prepare_vdu_cloud_init
.assert_called_once()
4848 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4849 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4850 mock_prepare_vdu_interfaces
.assert_called_once_with(
4852 expected_extra_dict3
,
4859 self
.assertDictEqual(result
, expected_extra_dict3
)
4860 mock_prepare_vdu_ssh_keys
.assert_called_once_with(
4861 target_vdu
, None, {"user-data": user_data
, "boot-data-drive": "vda"}
4863 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4864 mock_find_persistent_volumes
.assert_not_called()
4866 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4867 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4868 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4869 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4870 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4871 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4872 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4873 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4874 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4875 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4876 def test_process_vdu_params_wthout_persistent_storage(
4878 mock_prepare_vdu_affinity_group_list
,
4879 mock_add_persistent_ordinary_disks_to_disk_list
,
4880 mock_add_persistent_root_disk_to_disk_list
,
4881 mock_find_persistent_volumes
,
4882 mock_find_persistent_root_volumes
,
4883 mock_prepare_vdu_ssh_keys
,
4884 mock_prepare_vdu_cloud_init
,
4885 mock_prepare_vdu_interfaces
,
4886 mock_locate_vdu_interfaces
,
4887 mock_sort_vdu_interfaces
,
4889 """There is not any persistent storage."""
4890 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4893 target_vdu
["interfaces"] = interfaces_wth_all_positions
4894 mock_prepare_vdu_cloud_init
.return_value
= {}
4895 mock_prepare_vdu_affinity_group_list
.return_value
= []
4897 new_kwargs
= deepcopy(kwargs
)
4902 "tasks_by_target_record_id": {},
4906 expected_extra_dict_copy
= deepcopy(expected_extra_dict2
)
4907 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4908 db
.get_one
.return_value
= vnfd
4909 result
= Ns
._process
_vdu
_params
(
4910 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4912 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4913 mock_locate_vdu_interfaces
.assert_not_called()
4914 mock_prepare_vdu_cloud_init
.assert_called_once()
4915 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4916 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4917 mock_prepare_vdu_interfaces
.assert_called_once_with(
4919 expected_extra_dict_copy
,
4926 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4927 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4928 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4929 mock_find_persistent_volumes
.assert_not_called()
4931 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4932 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4933 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4934 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4935 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4936 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4937 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4938 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4939 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4940 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4941 def test_process_vdu_params_interfaces_partially_located(
4943 mock_prepare_vdu_affinity_group_list
,
4944 mock_add_persistent_ordinary_disks_to_disk_list
,
4945 mock_add_persistent_root_disk_to_disk_list
,
4946 mock_find_persistent_volumes
,
4947 mock_find_persistent_root_volumes
,
4948 mock_prepare_vdu_ssh_keys
,
4949 mock_prepare_vdu_cloud_init
,
4950 mock_prepare_vdu_interfaces
,
4951 mock_locate_vdu_interfaces
,
4952 mock_sort_vdu_interfaces
,
4954 """Some interfaces have position."""
4955 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4958 target_vdu
["interfaces"] = [
4961 "ns-vld-id": "net1",
4963 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
4966 "ns-vld-id": "mgmtnet",
4969 mock_prepare_vdu_cloud_init
.return_value
= {}
4970 mock_prepare_vdu_affinity_group_list
.return_value
= []
4971 persistent_root_disk
= {
4972 "persistent-root-volume": {
4973 "image_id": "ubuntu20.04",
4977 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
4979 new_kwargs
= deepcopy(kwargs
)
4984 "tasks_by_target_record_id": {},
4989 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4990 db
.get_one
.return_value
= vnfd
4991 result
= Ns
._process
_vdu
_params
(
4992 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4994 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
4995 mock_sort_vdu_interfaces
.assert_not_called()
4996 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
4997 mock_prepare_vdu_cloud_init
.assert_called_once()
4998 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4999 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5000 mock_prepare_vdu_interfaces
.assert_called_once_with(
5002 expected_extra_dict_copy
,
5009 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5010 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5011 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5012 mock_find_persistent_volumes
.assert_not_called()
5013 mock_find_persistent_root_volumes
.assert_not_called()
5015 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5016 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5017 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5018 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5019 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5020 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5021 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5022 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5023 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5024 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5025 def test_process_vdu_params_no_interface_position(
5027 mock_prepare_vdu_affinity_group_list
,
5028 mock_add_persistent_ordinary_disks_to_disk_list
,
5029 mock_add_persistent_root_disk_to_disk_list
,
5030 mock_find_persistent_volumes
,
5031 mock_find_persistent_root_volumes
,
5032 mock_prepare_vdu_ssh_keys
,
5033 mock_prepare_vdu_cloud_init
,
5034 mock_prepare_vdu_interfaces
,
5035 mock_locate_vdu_interfaces
,
5036 mock_sort_vdu_interfaces
,
5038 """Interfaces do not have position."""
5039 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5042 target_vdu
["interfaces"] = interfaces_wthout_positions
5043 mock_prepare_vdu_cloud_init
.return_value
= {}
5044 mock_prepare_vdu_affinity_group_list
.return_value
= []
5045 persistent_root_disk
= {
5046 "persistent-root-volume": {
5047 "image_id": "ubuntu20.04",
5051 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5052 new_kwargs
= deepcopy(kwargs
)
5057 "tasks_by_target_record_id": {},
5062 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5063 db
.get_one
.return_value
= vnfd
5064 result
= Ns
._process
_vdu
_params
(
5065 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5067 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5068 mock_sort_vdu_interfaces
.assert_not_called()
5069 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5070 mock_prepare_vdu_cloud_init
.assert_called_once()
5071 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5072 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5073 mock_prepare_vdu_interfaces
.assert_called_once_with(
5075 expected_extra_dict_copy
,
5082 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5083 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5084 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5085 mock_find_persistent_volumes
.assert_not_called()
5086 mock_find_persistent_root_volumes
.assert_not_called()
5088 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5089 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5090 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5091 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5092 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5093 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5094 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5095 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5096 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5097 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5098 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5100 mock_prepare_vdu_affinity_group_list
,
5101 mock_add_persistent_ordinary_disks_to_disk_list
,
5102 mock_add_persistent_root_disk_to_disk_list
,
5103 mock_find_persistent_volumes
,
5104 mock_find_persistent_root_volumes
,
5105 mock_prepare_vdu_ssh_keys
,
5106 mock_prepare_vdu_cloud_init
,
5107 mock_prepare_vdu_interfaces
,
5108 mock_locate_vdu_interfaces
,
5109 mock_sort_vdu_interfaces
,
5111 """Prepare vdu interfaces method raises exception."""
5112 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5115 target_vdu
["interfaces"] = interfaces_wthout_positions
5116 mock_prepare_vdu_cloud_init
.return_value
= {}
5117 mock_prepare_vdu_affinity_group_list
.return_value
= []
5118 persistent_root_disk
= {
5119 "persistent-root-volume": {
5120 "image_id": "ubuntu20.04",
5124 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5125 new_kwargs
= deepcopy(kwargs
)
5130 "tasks_by_target_record_id": {},
5134 mock_prepare_vdu_interfaces
.side_effect
= TypeError
5136 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5137 db
.get_one
.return_value
= vnfd
5138 with self
.assertRaises(Exception) as err
:
5139 Ns
._process
_vdu
_params
(
5140 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5142 self
.assertEqual(type(err
), TypeError)
5143 mock_sort_vdu_interfaces
.assert_not_called()
5144 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5145 mock_prepare_vdu_cloud_init
.assert_not_called()
5146 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
5147 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5148 mock_prepare_vdu_interfaces
.assert_called_once()
5149 mock_prepare_vdu_ssh_keys
.assert_not_called()
5150 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5151 mock_find_persistent_volumes
.assert_not_called()
5152 mock_find_persistent_root_volumes
.assert_not_called()
5154 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5155 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5156 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5157 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5158 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5159 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5160 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5161 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5162 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5163 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5164 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5166 mock_prepare_vdu_affinity_group_list
,
5167 mock_add_persistent_ordinary_disks_to_disk_list
,
5168 mock_add_persistent_root_disk_to_disk_list
,
5169 mock_find_persistent_volumes
,
5170 mock_find_persistent_root_volumes
,
5171 mock_prepare_vdu_ssh_keys
,
5172 mock_prepare_vdu_cloud_init
,
5173 mock_prepare_vdu_interfaces
,
5174 mock_locate_vdu_interfaces
,
5175 mock_sort_vdu_interfaces
,
5177 """Add persistent root disk method raises exception."""
5178 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5181 target_vdu
["interfaces"] = interfaces_wthout_positions
5182 mock_prepare_vdu_cloud_init
.return_value
= {}
5183 mock_prepare_vdu_affinity_group_list
.return_value
= []
5184 mock_add_persistent_root_disk_to_disk_list
.side_effect
= KeyError
5185 new_kwargs
= deepcopy(kwargs
)
5190 "tasks_by_target_record_id": {},
5195 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5196 db
.get_one
.return_value
= vnfd
5197 with self
.assertRaises(Exception) as err
:
5198 Ns
._process
_vdu
_params
(
5199 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5201 self
.assertEqual(type(err
), KeyError)
5202 mock_sort_vdu_interfaces
.assert_not_called()
5203 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5204 mock_prepare_vdu_cloud_init
.assert_called_once()
5205 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5206 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5207 mock_prepare_vdu_interfaces
.assert_called_once_with(
5211 f
"{ns_preffix}:image.0",
5212 f
"{ns_preffix}:flavor.0",
5222 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5223 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5224 mock_find_persistent_volumes
.assert_not_called()
5225 mock_find_persistent_root_volumes
.assert_not_called()
5227 def test_select_persistent_root_disk(self
):
5228 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5229 vdu
["virtual-storage-desc"] = [
5230 "persistent-root-volume",
5231 "persistent-volume2",
5234 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5235 expected_result
= vsd
5236 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5237 self
.assertEqual(result
, expected_result
)
5239 def test_select_persistent_root_disk_first_vsd_is_different(self
):
5240 """VDU first virtual-storage-desc is different than vsd id."""
5241 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5242 vdu
["virtual-storage-desc"] = [
5243 "persistent-volume2",
5244 "persistent-root-volume",
5247 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5248 expected_result
= None
5249 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5250 self
.assertEqual(result
, expected_result
)
5252 def test_select_persistent_root_disk_vsd_is_not_persistent(self
):
5253 """vsd type is not persistent."""
5254 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5255 vdu
["virtual-storage-desc"] = [
5256 "persistent-volume2",
5257 "persistent-root-volume",
5260 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5261 vsd
["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5262 expected_result
= None
5263 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5264 self
.assertEqual(result
, expected_result
)
5266 def test_select_persistent_root_disk_vsd_does_not_have_size(self
):
5267 """vsd size is None."""
5268 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5269 vdu
["virtual-storage-desc"] = [
5270 "persistent-volume2",
5271 "persistent-root-volume",
5274 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5275 vsd
["size-of-storage"] = None
5276 expected_result
= None
5277 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5278 self
.assertEqual(result
, expected_result
)
5280 def test_select_persistent_root_disk_vdu_wthout_vsd(self
):
5281 """VDU does not have virtual-storage-desc."""
5282 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5283 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5284 expected_result
= None
5285 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5286 self
.assertEqual(result
, expected_result
)
5288 def test_select_persistent_root_disk_invalid_vsd_type(self
):
5289 """vsd is list, expected to be a dict."""
5290 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5291 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"]
5292 with self
.assertRaises(AttributeError):
5293 Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)