1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy
import deepcopy
19 from unittest
.mock
import MagicMock
, Mock
, patch
29 from osm_ng_ro
.ns
import Ns
, NsException
32 __author__
= "Eduardo Sousa"
33 __date__
= "$19-NOV-2021 00:00:00$"
36 # Variables used in Tests
37 vnfd_wth_persistent_storage
= {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
56 "virtual-storage-desc": [
57 "persistent-root-volume",
64 "virtual-storage-desc": [
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
76 "id": "ephemeral-volume",
77 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
78 "size-of-storage": "1",
84 "path": "/app/storage/",
89 vim_volume_id
= "ru937f49-3870-4169-b758-9732e1ff40f3"
90 task_by_target_record_id
= {
91 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
92 "extra_dict": {"params": {"net_type": "SR-IOV"}}
95 interfaces_wthout_positions
= [
106 "ns-vld-id": "mgmtnet",
109 interfaces_wth_all_positions
= [
122 "ns-vld-id": "mgmtnet",
126 target_vdu_wth_persistent_storage
= {
127 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
130 "vdu-name": "several_volumes-VM",
134 "ns-vld-id": "mgmtnet",
137 "virtual-storages": [
139 "id": "persistent-volume2",
140 "size-of-storage": "10",
141 "type-of-storage": "persistent-storage:persistent-storage",
144 "id": "persistent-root-volume",
145 "size-of-storage": "10",
146 "type-of-storage": "persistent-storage:persistent-storage",
149 "id": "ephemeral-volume",
150 "size-of-storage": "1",
151 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
155 db
= MagicMock(name
="database mock")
156 fs
= MagicMock(name
="database mock")
157 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
158 vnf_preffix
= "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
159 vnfr_id
= "wh47f48-y870-4169-b758-5732e1ff40f5"
160 nsr_id
= "th47f48-9870-4169-b758-9732e1ff40f3"
162 "name": "sample_name",
164 expected_extra_dict
= {
166 f
"{ns_preffix}:image.0",
167 f
"{ns_preffix}:flavor.0",
170 "affinity_group_list": [],
171 "availability_zone_index": None,
172 "availability_zone_list": None,
173 "cloud_config": None,
174 "description": "several_volumes-VM",
176 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
177 "image_id": f
"TASK-{ns_preffix}:image.0",
178 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
184 expected_extra_dict2
= {
186 f
"{ns_preffix}:image.0",
187 f
"{ns_preffix}:flavor.0",
190 "affinity_group_list": [],
191 "availability_zone_index": None,
192 "availability_zone_list": None,
193 "cloud_config": None,
194 "description": "without_volumes-VM",
196 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
197 "image_id": f
"TASK-{ns_preffix}:image.0",
198 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
203 tasks_by_target_record_id
= {
204 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
207 "net_type": "SR-IOV",
214 "vdu2cloud_init": {},
216 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
217 "member-vnf-index-ref": "vnf-several-volumes",
220 vnfd_wthout_persistent_storage
= {
221 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
225 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
228 "id": "without_volumes-vnf",
229 "product-name": "without_volumes-vnf",
232 "id": "without_volumes-VM",
233 "name": "without_volumes-VM",
234 "sw-image-desc": "ubuntu20.04",
235 "alternative-sw-image-desc": [
239 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
243 "virtual-storage-desc": [
244 {"id": "root-volume", "size-of-storage": "10"},
246 "id": "ephemeral-volume",
247 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
248 "size-of-storage": "1",
254 "path": "/app/storage/",
260 target_vdu_wthout_persistent_storage
= {
261 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
264 "vdu-name": "without_volumes-VM",
268 "ns-vld-id": "mgmtnet",
271 "virtual-storages": [
274 "size-of-storage": "10",
277 "id": "ephemeral-volume",
278 "size-of-storage": "1",
279 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
283 cloud_init_content
= """
288 overwrite: {{is_override}}
291 - [ sh, -xc, "echo $(date) '{{command}}'" ]
302 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
306 class CopyingMock(MagicMock
):
307 def __call__(self
, *args
, **kwargs
):
308 args
= deepcopy(args
)
309 kwargs
= deepcopy(kwargs
)
310 return super(CopyingMock
, self
).__call
__(*args
, **kwargs
)
313 class TestNs(unittest
.TestCase
):
317 def test__create_task_without_extra_dict(self
):
319 "target_id": "vim_openstack_1",
320 "action_id": "123456",
322 "task_id": "123456:1",
323 "status": "SCHEDULED",
326 "target_record": "test_target_record",
327 "target_record_id": "test_target_record_id",
330 "action_id": "123456",
335 task
= Ns
._create
_task
(
336 deployment_info
=deployment_info
,
337 target_id
="vim_openstack_1",
340 target_record
="test_target_record",
341 target_record_id
="test_target_record_id",
344 self
.assertEqual(deployment_info
.get("task_index"), 2)
345 self
.assertDictEqual(task
, expected_result
)
347 def test__create_task(self
):
349 "target_id": "vim_openstack_1",
350 "action_id": "123456",
352 "task_id": "123456:1",
353 "status": "SCHEDULED",
356 "target_record": "test_target_record",
357 "target_record_id": "test_target_record_id",
358 # values coming from extra_dict
359 "params": "test_params",
360 "find_params": "test_find_params",
361 "depends_on": "test_depends_on",
364 "action_id": "123456",
369 task
= Ns
._create
_task
(
370 deployment_info
=deployment_info
,
371 target_id
="vim_openstack_1",
374 target_record
="test_target_record",
375 target_record_id
="test_target_record_id",
377 "params": "test_params",
378 "find_params": "test_find_params",
379 "depends_on": "test_depends_on",
383 self
.assertEqual(deployment_info
.get("task_index"), 2)
384 self
.assertDictEqual(task
, expected_result
)
386 @patch("osm_ng_ro.ns.time")
387 def test__create_ro_task(self
, mock_time
: Mock
):
388 now
= 1637324838.994551
389 mock_time
.return_value
= now
391 "target_id": "vim_openstack_1",
392 "action_id": "123456",
394 "task_id": "123456:1",
395 "status": "SCHEDULED",
398 "target_record": "test_target_record",
399 "target_record_id": "test_target_record_id",
400 # values coming from extra_dict
401 "params": "test_params",
402 "find_params": "test_find_params",
403 "depends_on": "test_depends_on",
409 "target_id": "vim_openstack_1",
412 "created_items": None,
426 ro_task
= Ns
._create
_ro
_task
(
427 target_id
="vim_openstack_1",
431 self
.assertDictEqual(ro_task
, expected_result
)
433 def test__process_image_params_with_empty_target_image(self
):
439 result
= Ns
._process
_image
_params
(
440 target_image
=target_image
,
443 target_record_id
=None,
446 self
.assertDictEqual(expected_result
, result
)
448 def test__process_image_params_with_wrong_target_image(self
):
453 "no_image": "to_see_here",
456 result
= Ns
._process
_image
_params
(
457 target_image
=target_image
,
460 target_record_id
=None,
463 self
.assertDictEqual(expected_result
, result
)
465 def test__process_image_params_with_image(self
):
477 result
= Ns
._process
_image
_params
(
478 target_image
=target_image
,
481 target_record_id
=None,
484 self
.assertDictEqual(expected_result
, result
)
486 def test__process_image_params_with_vim_image_id(self
):
495 "vim_image_id": "123456",
498 result
= Ns
._process
_image
_params
(
499 target_image
=target_image
,
502 target_record_id
=None,
505 self
.assertDictEqual(expected_result
, result
)
507 def test__process_image_params_with_image_checksum(self
):
511 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
516 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
519 result
= Ns
._process
_image
_params
(
520 target_image
=target_image
,
523 target_record_id
=None,
526 self
.assertDictEqual(expected_result
, result
)
528 def test__get_resource_allocation_params_with_empty_target_image(self
):
530 quota_descriptor
= {}
532 result
= Ns
._get
_resource
_allocation
_params
(
533 quota_descriptor
=quota_descriptor
,
536 self
.assertDictEqual(expected_result
, result
)
538 def test__get_resource_allocation_params_with_wrong_target_image(self
):
541 "no_quota": "present_here",
544 result
= Ns
._get
_resource
_allocation
_params
(
545 quota_descriptor
=quota_descriptor
,
548 self
.assertDictEqual(expected_result
, result
)
550 def test__get_resource_allocation_params_with_limit(self
):
558 result
= Ns
._get
_resource
_allocation
_params
(
559 quota_descriptor
=quota_descriptor
,
562 self
.assertDictEqual(expected_result
, result
)
564 def test__get_resource_allocation_params_with_reserve(self
):
572 result
= Ns
._get
_resource
_allocation
_params
(
573 quota_descriptor
=quota_descriptor
,
576 self
.assertDictEqual(expected_result
, result
)
578 def test__get_resource_allocation_params_with_shares(self
):
586 result
= Ns
._get
_resource
_allocation
_params
(
587 quota_descriptor
=quota_descriptor
,
590 self
.assertDictEqual(expected_result
, result
)
592 def test__get_resource_allocation_params(self
):
604 result
= Ns
._get
_resource
_allocation
_params
(
605 quota_descriptor
=quota_descriptor
,
608 self
.assertDictEqual(expected_result
, result
)
610 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
611 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
619 result
= Ns
._process
_guest
_epa
_quota
_params
(
620 guest_epa_quota
=guest_epa_quota
,
621 epa_vcpu_set
=epa_vcpu_set
,
624 self
.assertDictEqual(expected_result
, result
)
625 self
.assertFalse(resource_allocation
.called
)
627 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
628 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
636 result
= Ns
._process
_guest
_epa
_quota
_params
(
637 guest_epa_quota
=guest_epa_quota
,
638 epa_vcpu_set
=epa_vcpu_set
,
641 self
.assertDictEqual(expected_result
, result
)
642 self
.assertFalse(resource_allocation
.called
)
644 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
645 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
651 "no-quota": "nothing",
655 result
= Ns
._process
_guest
_epa
_quota
_params
(
656 guest_epa_quota
=guest_epa_quota
,
657 epa_vcpu_set
=epa_vcpu_set
,
660 self
.assertDictEqual(expected_result
, result
)
661 self
.assertFalse(resource_allocation
.called
)
663 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
664 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
670 "no-quota": "nothing",
674 result
= Ns
._process
_guest
_epa
_quota
_params
(
675 guest_epa_quota
=guest_epa_quota
,
676 epa_vcpu_set
=epa_vcpu_set
,
679 self
.assertDictEqual(expected_result
, result
)
680 self
.assertFalse(resource_allocation
.called
)
682 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
683 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
697 result
= Ns
._process
_guest
_epa
_quota
_params
(
698 guest_epa_quota
=guest_epa_quota
,
699 epa_vcpu_set
=epa_vcpu_set
,
702 self
.assertDictEqual(expected_result
, result
)
703 self
.assertFalse(resource_allocation
.called
)
705 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
706 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
726 resource_allocation_param
= {
731 resource_allocation
.return_value
= {
737 result
= Ns
._process
_guest
_epa
_quota
_params
(
738 guest_epa_quota
=guest_epa_quota
,
739 epa_vcpu_set
=epa_vcpu_set
,
742 resource_allocation
.assert_called_once_with(resource_allocation_param
)
743 self
.assertDictEqual(expected_result
, result
)
745 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
746 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
766 resource_allocation_param
= {
771 resource_allocation
.return_value
= {
777 result
= Ns
._process
_guest
_epa
_quota
_params
(
778 guest_epa_quota
=guest_epa_quota
,
779 epa_vcpu_set
=epa_vcpu_set
,
782 resource_allocation
.assert_called_once_with(resource_allocation_param
)
783 self
.assertDictEqual(expected_result
, result
)
785 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
786 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
806 resource_allocation_param
= {
811 resource_allocation
.return_value
= {
817 result
= Ns
._process
_guest
_epa
_quota
_params
(
818 guest_epa_quota
=guest_epa_quota
,
819 epa_vcpu_set
=epa_vcpu_set
,
822 resource_allocation
.assert_called_once_with(resource_allocation_param
)
823 self
.assertDictEqual(expected_result
, result
)
825 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
826 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
846 resource_allocation_param
= {
851 resource_allocation
.return_value
= {
857 result
= Ns
._process
_guest
_epa
_quota
_params
(
858 guest_epa_quota
=guest_epa_quota
,
859 epa_vcpu_set
=epa_vcpu_set
,
862 resource_allocation
.assert_called_once_with(resource_allocation_param
)
863 self
.assertDictEqual(expected_result
, result
)
865 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
866 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
886 resource_allocation_param
= {
891 resource_allocation
.return_value
= {
897 result
= Ns
._process
_guest
_epa
_quota
_params
(
898 guest_epa_quota
=guest_epa_quota
,
899 epa_vcpu_set
=epa_vcpu_set
,
902 resource_allocation
.assert_called_once_with(resource_allocation_param
)
903 self
.assertDictEqual(expected_result
, result
)
905 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
906 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
926 resource_allocation_param
= {
931 resource_allocation
.return_value
= {
937 result
= Ns
._process
_guest
_epa
_quota
_params
(
938 guest_epa_quota
=guest_epa_quota
,
939 epa_vcpu_set
=epa_vcpu_set
,
942 resource_allocation
.assert_called_once_with(resource_allocation_param
)
943 self
.assertDictEqual(expected_result
, result
)
945 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
946 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
966 resource_allocation_param
= {
971 resource_allocation
.return_value
= {
977 result
= Ns
._process
_guest
_epa
_quota
_params
(
978 guest_epa_quota
=guest_epa_quota
,
979 epa_vcpu_set
=epa_vcpu_set
,
982 resource_allocation
.assert_called_once_with(resource_allocation_param
)
983 self
.assertDictEqual(expected_result
, result
)
985 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
986 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
1031 resource_allocation
.return_value
= {
1037 result
= Ns
._process
_guest
_epa
_quota
_params
(
1038 guest_epa_quota
=guest_epa_quota
,
1039 epa_vcpu_set
=epa_vcpu_set
,
1042 self
.assertTrue(resource_allocation
.called
)
1043 self
.assertDictEqual(expected_result
, result
)
1045 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1046 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1048 resource_allocation
,
1094 epa_vcpu_set
= False
1096 resource_allocation
.return_value
= {
1102 result
= Ns
._process
_guest
_epa
_quota
_params
(
1103 guest_epa_quota
=guest_epa_quota
,
1104 epa_vcpu_set
=epa_vcpu_set
,
1107 self
.assertTrue(resource_allocation
.called
)
1108 self
.assertDictEqual(expected_result
, result
)
1110 def test__process_guest_epa_numa_params_with_empty_numa_params(self
):
1111 expected_numa_result
= []
1112 expected_epa_vcpu_set_result
= False
1113 guest_epa_quota
= {}
1115 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1116 guest_epa_quota
=guest_epa_quota
,
1118 self
.assertEqual(expected_numa_result
, numa_result
)
1119 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1121 def test__process_guest_epa_numa_params_with_wrong_numa_params(self
):
1122 expected_numa_result
= []
1123 expected_epa_vcpu_set_result
= False
1124 guest_epa_quota
= {"no_nume": "here"}
1126 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1127 guest_epa_quota
=guest_epa_quota
,
1130 self
.assertEqual(expected_numa_result
, numa_result
)
1131 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1133 def test__process_guest_epa_numa_params_with_numa_node_policy(self
):
1134 expected_numa_result
= []
1135 expected_epa_vcpu_set_result
= False
1136 guest_epa_quota
= {"numa-node-policy": {}}
1138 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1139 guest_epa_quota
=guest_epa_quota
,
1142 self
.assertEqual(expected_numa_result
, numa_result
)
1143 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1145 def test__process_guest_epa_numa_params_with_no_node(self
):
1146 expected_numa_result
= []
1147 expected_epa_vcpu_set_result
= False
1149 "numa-node-policy": {
1154 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1155 guest_epa_quota
=guest_epa_quota
,
1158 self
.assertEqual(expected_numa_result
, numa_result
)
1159 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1161 def test__process_guest_epa_numa_params_with_1_node_num_cores(self
):
1162 expected_numa_result
= [{"cores": 3}]
1163 expected_epa_vcpu_set_result
= True
1165 "numa-node-policy": {
1174 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1175 guest_epa_quota
=guest_epa_quota
,
1178 self
.assertEqual(expected_numa_result
, numa_result
)
1179 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1181 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self
):
1182 expected_numa_result
= [{"paired_threads": 3}]
1183 expected_epa_vcpu_set_result
= True
1185 "numa-node-policy": {
1188 "paired-threads": {"num-paired-threads": "3"},
1194 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1195 guest_epa_quota
=guest_epa_quota
,
1198 self
.assertEqual(expected_numa_result
, numa_result
)
1199 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1201 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self
):
1202 expected_numa_result
= [
1204 "paired-threads-id": [("0", "1"), ("4", "5")],
1207 expected_epa_vcpu_set_result
= False
1209 "numa-node-policy": {
1213 "paired-thread-ids": [
1229 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1230 guest_epa_quota
=guest_epa_quota
,
1233 self
.assertEqual(expected_numa_result
, numa_result
)
1234 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1236 def test__process_guest_epa_numa_params_with_1_node_num_threads(self
):
1237 expected_numa_result
= [{"threads": 3}]
1238 expected_epa_vcpu_set_result
= True
1240 "numa-node-policy": {
1249 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1250 guest_epa_quota
=guest_epa_quota
,
1253 self
.assertEqual(expected_numa_result
, numa_result
)
1254 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1256 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self
):
1257 expected_numa_result
= [{"memory": 2}]
1258 expected_epa_vcpu_set_result
= False
1260 "numa-node-policy": {
1269 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1270 guest_epa_quota
=guest_epa_quota
,
1273 self
.assertEqual(expected_numa_result
, numa_result
)
1274 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1276 def test__process_guest_epa_numa_params_with_1_node_vcpu(self
):
1277 expected_numa_result
= [
1283 expected_epa_vcpu_set_result
= False
1285 "numa-node-policy": {
1286 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1290 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1291 guest_epa_quota
=guest_epa_quota
,
1294 self
.assertEqual(expected_numa_result
, numa_result
)
1295 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1297 def test__process_guest_epa_numa_params_with_2_node_vcpu(self
):
1298 expected_numa_result
= [
1309 expected_epa_vcpu_set_result
= False
1311 "numa-node-policy": {
1313 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1314 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1319 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1320 guest_epa_quota
=guest_epa_quota
,
1323 self
.assertEqual(expected_numa_result
, numa_result
)
1324 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1326 def test__process_guest_epa_numa_params_with_1_node(self
):
1327 expected_numa_result
= [
1332 "paired_threads": 3,
1333 "paired-threads-id": [("0", "1"), ("4", "5")],
1338 expected_epa_vcpu_set_result
= True
1340 "numa-node-policy": {
1345 "num-paired-threads": "3",
1346 "paired-thread-ids": [
1364 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1365 guest_epa_quota
=guest_epa_quota
,
1368 self
.assertEqual(expected_numa_result
, numa_result
)
1369 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1371 def test__process_guest_epa_numa_params_with_2_nodes(self
):
1372 expected_numa_result
= [
1375 "paired_threads": 3,
1376 "paired-threads-id": [("0", "1"), ("4", "5")],
1382 "paired_threads": 7,
1383 "paired-threads-id": [("2", "3"), ("5", "6")],
1388 expected_epa_vcpu_set_result
= True
1390 "numa-node-policy": {
1395 "num-paired-threads": "3",
1396 "paired-thread-ids": [
1413 "num-paired-threads": "7",
1414 "paired-thread-ids": [
1432 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1433 guest_epa_quota
=guest_epa_quota
,
1436 self
.assertEqual(expected_numa_result
, numa_result
)
1437 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1439 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self
):
1440 expected_numa_result
= {}
1441 expected_epa_vcpu_set_result
= False
1442 guest_epa_quota
= {}
1444 epa_vcpu_set
= False
1446 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1447 guest_epa_quota
=guest_epa_quota
,
1448 vcpu_count
=vcpu_count
,
1449 epa_vcpu_set
=epa_vcpu_set
,
1452 self
.assertDictEqual(expected_numa_result
, numa_result
)
1453 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1455 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self
):
1456 expected_numa_result
= {}
1457 expected_epa_vcpu_set_result
= False
1459 "no-cpu-pinning-policy": "here",
1462 epa_vcpu_set
= False
1464 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1465 guest_epa_quota
=guest_epa_quota
,
1466 vcpu_count
=vcpu_count
,
1467 epa_vcpu_set
=epa_vcpu_set
,
1470 self
.assertDictEqual(expected_numa_result
, numa_result
)
1471 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1473 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self
):
1474 expected_numa_result
= {}
1475 expected_epa_vcpu_set_result
= True
1477 "cpu-pinning-policy": "DEDICATED",
1482 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1483 guest_epa_quota
=guest_epa_quota
,
1484 vcpu_count
=vcpu_count
,
1485 epa_vcpu_set
=epa_vcpu_set
,
1488 self
.assertDictEqual(expected_numa_result
, numa_result
)
1489 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1491 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self
):
1492 expected_numa_result
= {"threads": 3}
1493 expected_epa_vcpu_set_result
= True
1495 "cpu-pinning-policy": "DEDICATED",
1496 "cpu-thread-pinning-policy": "PREFER",
1499 epa_vcpu_set
= False
1501 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1502 guest_epa_quota
=guest_epa_quota
,
1503 vcpu_count
=vcpu_count
,
1504 epa_vcpu_set
=epa_vcpu_set
,
1507 self
.assertDictEqual(expected_numa_result
, numa_result
)
1508 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1510 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self
):
1511 expected_numa_result
= {"cores": 3}
1512 expected_epa_vcpu_set_result
= True
1514 "cpu-pinning-policy": "DEDICATED",
1515 "cpu-thread-pinning-policy": "ISOLATE",
1518 epa_vcpu_set
= False
1520 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1521 guest_epa_quota
=guest_epa_quota
,
1522 vcpu_count
=vcpu_count
,
1523 epa_vcpu_set
=epa_vcpu_set
,
1526 self
.assertDictEqual(expected_numa_result
, numa_result
)
1527 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1529 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self
):
1530 expected_numa_result
= {"threads": 3}
1531 expected_epa_vcpu_set_result
= True
1533 "cpu-pinning-policy": "DEDICATED",
1534 "cpu-thread-pinning-policy": "REQUIRE",
1537 epa_vcpu_set
= False
1539 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1540 guest_epa_quota
=guest_epa_quota
,
1541 vcpu_count
=vcpu_count
,
1542 epa_vcpu_set
=epa_vcpu_set
,
1545 self
.assertDictEqual(expected_numa_result
, numa_result
)
1546 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1548 def test__process_guest_epa_cpu_pinning_params(self
):
1549 expected_numa_result
= {"threads": 3}
1550 expected_epa_vcpu_set_result
= True
1552 "cpu-pinning-policy": "DEDICATED",
1555 epa_vcpu_set
= False
1557 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1558 guest_epa_quota
=guest_epa_quota
,
1559 vcpu_count
=vcpu_count
,
1560 epa_vcpu_set
=epa_vcpu_set
,
1563 self
.assertDictEqual(expected_numa_result
, numa_result
)
1564 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1566 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1567 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1568 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1569 def test__process_guest_epa_params_with_empty_params(
1571 guest_epa_numa_params
,
1572 guest_epa_cpu_pinning_params
,
1573 guest_epa_quota_params
,
1575 expected_result
= {}
1578 result
= Ns
._process
_epa
_params
(
1579 target_flavor
=target_flavor
,
1582 self
.assertDictEqual(expected_result
, result
)
1583 self
.assertFalse(guest_epa_numa_params
.called
)
1584 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1585 self
.assertFalse(guest_epa_quota_params
.called
)
1587 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1588 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1589 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1590 def test__process_guest_epa_params_with_wrong_params(
1592 guest_epa_numa_params
,
1593 guest_epa_cpu_pinning_params
,
1594 guest_epa_quota_params
,
1596 expected_result
= {}
1598 "no-guest-epa": "here",
1601 result
= Ns
._process
_epa
_params
(
1602 target_flavor
=target_flavor
,
1605 self
.assertDictEqual(expected_result
, result
)
1606 self
.assertFalse(guest_epa_numa_params
.called
)
1607 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1608 self
.assertFalse(guest_epa_quota_params
.called
)
1610 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1611 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1612 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1613 def test__process_guest_epa_params(
1615 guest_epa_numa_params
,
1616 guest_epa_cpu_pinning_params
,
1617 guest_epa_quota_params
,
1620 "mem-policy": "STRICT",
1625 "numa-node-policy": {
1626 "mem-policy": "STRICT",
1631 guest_epa_numa_params
.return_value
= ({}, False)
1632 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1633 guest_epa_quota_params
.return_value
= {}
1635 result
= Ns
._process
_epa
_params
(
1636 target_flavor
=target_flavor
,
1639 self
.assertDictEqual(expected_result
, result
)
1640 self
.assertTrue(guest_epa_numa_params
.called
)
1641 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1642 self
.assertTrue(guest_epa_quota_params
.called
)
1644 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1645 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1646 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1647 def test__process_guest_epa_params_with_mempage_size(
1649 guest_epa_numa_params
,
1650 guest_epa_cpu_pinning_params
,
1651 guest_epa_quota_params
,
1654 "mempage-size": "1G",
1655 "mem-policy": "STRICT",
1660 "mempage-size": "1G",
1661 "numa-node-policy": {
1662 "mem-policy": "STRICT",
1667 guest_epa_numa_params
.return_value
= ({}, False)
1668 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1669 guest_epa_quota_params
.return_value
= {}
1671 result
= Ns
._process
_epa
_params
(
1672 target_flavor
=target_flavor
,
1675 self
.assertDictEqual(expected_result
, result
)
1676 self
.assertTrue(guest_epa_numa_params
.called
)
1677 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1678 self
.assertTrue(guest_epa_quota_params
.called
)
1680 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1681 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1682 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1683 def test__process_guest_epa_params_with_numa(
1685 guest_epa_numa_params
,
1686 guest_epa_cpu_pinning_params
,
1687 guest_epa_quota_params
,
1690 "mempage-size": "1G",
1691 "cpu-pinning-policy": "DEDICATED",
1692 "cpu-thread-pinning-policy": "PREFER",
1697 "paired-threads": 3,
1698 "paired-threads-id": [("0", "1"), ("4", "5")],
1702 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1703 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1704 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1705 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1710 "mempage-size": "1G",
1711 "cpu-pinning-policy": "DEDICATED",
1712 "cpu-thread-pinning-policy": "PREFER",
1713 "numa-node-policy": {
1718 "num-paired-threads": "3",
1719 "paired-thread-ids": [
1758 guest_epa_numa_params
.return_value
= (
1762 "paired-threads": 3,
1763 "paired-threads-id": [("0", "1"), ("4", "5")],
1770 guest_epa_cpu_pinning_params
.return_value
= (
1776 guest_epa_quota_params
.return_value
= {
1799 result
= Ns
._process
_epa
_params
(
1800 target_flavor
=target_flavor
,
1802 self
.assertEqual(expected_result
, result
)
1803 self
.assertTrue(guest_epa_numa_params
.called
)
1804 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1805 self
.assertTrue(guest_epa_quota_params
.called
)
1807 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1808 def test__process_flavor_params_with_empty_target_flavor(
1816 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1821 target_record_id
= ""
1823 with self
.assertRaises(KeyError):
1824 Ns
._process
_flavor
_params
(
1825 target_flavor
=target_flavor
,
1828 target_record_id
=target_record_id
,
1831 self
.assertFalse(epa_params
.called
)
1833 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1834 def test__process_flavor_params_with_wrong_target_flavor(
1839 "no-target-flavor": "here",
1843 target_record_id
= ""
1845 with self
.assertRaises(KeyError):
1846 Ns
._process
_flavor
_params
(
1847 target_flavor
=target_flavor
,
1850 target_record_id
=target_record_id
,
1853 self
.assertFalse(epa_params
.called
)
1855 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1856 def test__process_flavor_params_with_empty_indata(
1880 "memory-mb": "1024",
1885 target_record_id
= ""
1887 epa_params
.return_value
= {}
1889 result
= Ns
._process
_flavor
_params
(
1890 target_flavor
=target_flavor
,
1893 target_record_id
=target_record_id
,
1896 self
.assertTrue(epa_params
.called
)
1897 self
.assertDictEqual(result
, expected_result
)
1899 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1900 def test__process_flavor_params_with_wrong_indata(
1924 "memory-mb": "1024",
1931 target_record_id
= ""
1933 epa_params
.return_value
= {}
1935 result
= Ns
._process
_flavor
_params
(
1936 target_flavor
=target_flavor
,
1939 target_record_id
=target_record_id
,
1942 self
.assertTrue(epa_params
.called
)
1943 self
.assertDictEqual(result
, expected_result
)
1945 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1946 def test__process_flavor_params_with_ephemeral_disk(
1954 db
.get_one
.return_value
= {
1955 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1960 {"id": "without_volumes-VM", "min-number-of-instances": 1}
1964 "id": "without_volumes-vnf",
1965 "product-name": "without_volumes-vnf",
1968 "id": "without_volumes-VM",
1969 "name": "without_volumes-VM",
1970 "sw-image-desc": "ubuntu20.04",
1971 "alternative-sw-image-desc": [
1973 "ubuntu20.04-azure",
1975 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
1979 "virtual-storage-desc": [
1980 {"id": "root-volume", "size-of-storage": "10"},
1982 "id": "ephemeral-volume",
1983 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
1984 "size-of-storage": "1",
1990 "path": "/app/storage/",
2018 "memory-mb": "1024",
2026 "ns-flavor-id": "test_id",
2027 "virtual-storages": [
2029 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2030 "size-of-storage": "10",
2035 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2040 target_record_id
= ""
2042 epa_params
.return_value
= {}
2044 result
= Ns
._process
_flavor
_params
(
2045 target_flavor
=target_flavor
,
2048 target_record_id
=target_record_id
,
2052 self
.assertTrue(epa_params
.called
)
2053 self
.assertDictEqual(result
, expected_result
)
2055 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2056 def test__process_flavor_params_with_swap_disk(
2083 "memory-mb": "1024",
2091 "ns-flavor-id": "test_id",
2092 "virtual-storages": [
2094 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2095 "size-of-storage": "20",
2100 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2105 target_record_id
= ""
2107 epa_params
.return_value
= {}
2109 result
= Ns
._process
_flavor
_params
(
2110 target_flavor
=target_flavor
,
2113 target_record_id
=target_record_id
,
2116 self
.assertTrue(epa_params
.called
)
2117 self
.assertDictEqual(result
, expected_result
)
2119 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2120 def test__process_flavor_params_with_persistent_root_disk(
2128 db
.get_one
.return_value
= {
2129 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2134 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2138 "id": "several_volumes-vnf",
2139 "product-name": "several_volumes-vnf",
2142 "id": "several_volumes-VM",
2143 "name": "several_volumes-VM",
2144 "sw-image-desc": "ubuntu20.04",
2145 "alternative-sw-image-desc": [
2147 "ubuntu20.04-azure",
2149 "virtual-storage-desc": [
2150 "persistent-root-volume",
2155 "virtual-storage-desc": [
2157 "id": "persistent-root-volume",
2158 "type-of-storage": "persistent-storage:persistent-storage",
2159 "size-of-storage": "10",
2165 "path": "/app/storage/",
2191 "memory-mb": "1024",
2199 "vdu-name": "several_volumes-VM",
2200 "ns-flavor-id": "test_id",
2201 "virtual-storages": [
2203 "type-of-storage": "persistent-storage:persistent-storage",
2204 "size-of-storage": "10",
2209 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2214 target_record_id
= ""
2216 epa_params
.return_value
= {}
2218 result
= Ns
._process
_flavor
_params
(
2219 target_flavor
=target_flavor
,
2222 target_record_id
=target_record_id
,
2226 self
.assertTrue(epa_params
.called
)
2227 self
.assertDictEqual(result
, expected_result
)
2229 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2230 def test__process_flavor_params_with_epa_params(
2241 "numa": "there-is-numa-here",
2252 "numa": "there-is-numa-here",
2261 "memory-mb": "1024",
2269 "ns-flavor-id": "test_id",
2272 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2277 target_record_id
= ""
2279 epa_params
.return_value
= {
2280 "numa": "there-is-numa-here",
2283 result
= Ns
._process
_flavor
_params
(
2284 target_flavor
=target_flavor
,
2287 target_record_id
=target_record_id
,
2290 self
.assertTrue(epa_params
.called
)
2291 self
.assertDictEqual(result
, expected_result
)
2293 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2294 def test__process_flavor_params(
2302 db
.get_one
.return_value
= {
2303 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2308 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2312 "id": "without_volumes-vnf",
2313 "product-name": "without_volumes-vnf",
2316 "id": "without_volumes-VM",
2317 "name": "without_volumes-VM",
2318 "sw-image-desc": "ubuntu20.04",
2319 "alternative-sw-image-desc": [
2321 "ubuntu20.04-azure",
2323 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2327 "virtual-storage-desc": [
2328 {"id": "root-volume", "size-of-storage": "10"},
2330 "id": "ephemeral-volume",
2331 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2332 "size-of-storage": "1",
2338 "path": "/app/storage/",
2353 "numa": "there-is-numa-here",
2366 "numa": "there-is-numa-here",
2375 "memory-mb": "1024",
2383 "ns-flavor-id": "test_id",
2384 "virtual-storages": [
2386 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2387 "size-of-storage": "10",
2390 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2391 "size-of-storage": "20",
2396 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2401 target_record_id
= ""
2403 epa_params
.return_value
= {
2404 "numa": "there-is-numa-here",
2407 result
= Ns
._process
_flavor
_params
(
2408 target_flavor
=target_flavor
,
2411 target_record_id
=target_record_id
,
2415 self
.assertTrue(epa_params
.called
)
2416 self
.assertDictEqual(result
, expected_result
)
2418 def test__ip_profile_to_ro_with_none(self
):
2421 result
= Ns
._ip
_profile
_to
_ro
(
2422 ip_profile
=ip_profile
,
2425 self
.assertIsNone(result
)
2427 def test__ip_profile_to_ro_with_empty_profile(self
):
2430 result
= Ns
._ip
_profile
_to
_ro
(
2431 ip_profile
=ip_profile
,
2434 self
.assertIsNone(result
)
2436 def test__ip_profile_to_ro_with_wrong_profile(self
):
2438 "no-profile": "here",
2441 "ip_version": "IPv4",
2442 "subnet_address": None,
2443 "gateway_address": None,
2444 "dhcp_enabled": False,
2445 "dhcp_start_address": None,
2449 result
= Ns
._ip
_profile
_to
_ro
(
2450 ip_profile
=ip_profile
,
2453 self
.assertDictEqual(expected_result
, result
)
2455 def test__ip_profile_to_ro_with_ipv4_profile(self
):
2457 "ip-version": "ipv4",
2458 "subnet-address": "192.168.0.0/24",
2459 "gateway-address": "192.168.0.254",
2462 "start-address": "192.168.0.10",
2467 "ip_version": "IPv4",
2468 "subnet_address": "192.168.0.0/24",
2469 "gateway_address": "192.168.0.254",
2470 "dhcp_enabled": True,
2471 "dhcp_start_address": "192.168.0.10",
2475 result
= Ns
._ip
_profile
_to
_ro
(
2476 ip_profile
=ip_profile
,
2479 self
.assertDictEqual(expected_result
, result
)
2481 def test__ip_profile_to_ro_with_ipv6_profile(self
):
2483 "ip-version": "ipv6",
2484 "subnet-address": "2001:0200:0001::/48",
2485 "gateway-address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2488 "start-address": "2001:0200:0001::0010",
2493 "ip_version": "IPv6",
2494 "subnet_address": "2001:0200:0001::/48",
2495 "gateway_address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2496 "dhcp_enabled": True,
2497 "dhcp_start_address": "2001:0200:0001::0010",
2501 result
= Ns
._ip
_profile
_to
_ro
(
2502 ip_profile
=ip_profile
,
2505 self
.assertDictEqual(expected_result
, result
)
2507 def test__ip_profile_to_ro_with_dns_server(self
):
2509 "ip-version": "ipv4",
2510 "subnet-address": "192.168.0.0/24",
2511 "gateway-address": "192.168.0.254",
2514 "start-address": "192.168.0.10",
2519 "address": "8.8.8.8",
2522 "address": "1.1.1.1",
2525 "address": "1.0.0.1",
2530 "ip_version": "IPv4",
2531 "subnet_address": "192.168.0.0/24",
2532 "gateway_address": "192.168.0.254",
2533 "dhcp_enabled": True,
2534 "dhcp_start_address": "192.168.0.10",
2536 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2539 result
= Ns
._ip
_profile
_to
_ro
(
2540 ip_profile
=ip_profile
,
2543 self
.assertDictEqual(expected_result
, result
)
2545 def test__ip_profile_to_ro_with_security_group(self
):
2547 "ip-version": "ipv4",
2548 "subnet-address": "192.168.0.0/24",
2549 "gateway-address": "192.168.0.254",
2552 "start-address": "192.168.0.10",
2556 "some-security-group": "here",
2560 "ip_version": "IPv4",
2561 "subnet_address": "192.168.0.0/24",
2562 "gateway_address": "192.168.0.254",
2563 "dhcp_enabled": True,
2564 "dhcp_start_address": "192.168.0.10",
2567 "some-security-group": "here",
2571 result
= Ns
._ip
_profile
_to
_ro
(
2572 ip_profile
=ip_profile
,
2575 self
.assertDictEqual(expected_result
, result
)
2577 def test__ip_profile_to_ro(self
):
2579 "ip-version": "ipv4",
2580 "subnet-address": "192.168.0.0/24",
2581 "gateway-address": "192.168.0.254",
2584 "start-address": "192.168.0.10",
2589 "address": "8.8.8.8",
2592 "address": "1.1.1.1",
2595 "address": "1.0.0.1",
2599 "some-security-group": "here",
2603 "ip_version": "IPv4",
2604 "subnet_address": "192.168.0.0/24",
2605 "gateway_address": "192.168.0.254",
2606 "dhcp_enabled": True,
2607 "dhcp_start_address": "192.168.0.10",
2609 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2611 "some-security-group": "here",
2615 result
= Ns
._ip
_profile
_to
_ro
(
2616 ip_profile
=ip_profile
,
2619 self
.assertDictEqual(expected_result
, result
)
2621 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2622 def test__process_net_params_with_empty_params(
2633 "provider_network": "some-profile-here",
2635 target_record_id
= ""
2638 "net_name": "ns-name-vld-name",
2639 "net_type": "bridge",
2641 "some_ip_profile": "here",
2643 "provider_network_profile": "some-profile-here",
2647 ip_profile_to_ro
.return_value
= {
2648 "some_ip_profile": "here",
2651 result
= Ns
._process
_net
_params
(
2652 target_vld
=target_vld
,
2655 target_record_id
=target_record_id
,
2658 self
.assertDictEqual(expected_result
, result
)
2659 self
.assertTrue(ip_profile_to_ro
.called
)
2661 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2662 def test__process_net_params_with_vim_info_sdn(
2674 "sdn-ports": ["some", "ports", "here"],
2675 "vlds": ["some", "vlds", "here"],
2678 target_record_id
= "vld.sdn.something"
2681 "sdn-ports": ["some", "ports", "here"],
2682 "vlds": ["some", "vlds", "here"],
2687 result
= Ns
._process
_net
_params
(
2688 target_vld
=target_vld
,
2691 target_record_id
=target_record_id
,
2694 self
.assertDictEqual(expected_result
, result
)
2695 self
.assertFalse(ip_profile_to_ro
.called
)
2697 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2698 def test__process_net_params_with_vim_info_sdn_target_vim(
2710 "sdn-ports": ["some", "ports", "here"],
2711 "vlds": ["some", "vlds", "here"],
2712 "target_vim": "some-vim",
2715 target_record_id
= "vld.sdn.something"
2717 "depends_on": ["some-vim vld.sdn"],
2719 "sdn-ports": ["some", "ports", "here"],
2720 "vlds": ["some", "vlds", "here"],
2721 "target_vim": "some-vim",
2726 result
= Ns
._process
_net
_params
(
2727 target_vld
=target_vld
,
2730 target_record_id
=target_record_id
,
2733 self
.assertDictEqual(expected_result
, result
)
2734 self
.assertFalse(ip_profile_to_ro
.called
)
2736 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2737 def test__process_net_params_with_vim_network_name(
2748 "vim_network_name": "some-network-name",
2750 target_record_id
= "vld.sdn.something"
2754 "name": "some-network-name",
2759 result
= Ns
._process
_net
_params
(
2760 target_vld
=target_vld
,
2763 target_record_id
=target_record_id
,
2766 self
.assertDictEqual(expected_result
, result
)
2767 self
.assertFalse(ip_profile_to_ro
.called
)
2769 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2770 def test__process_net_params_with_vim_network_id(
2781 "vim_network_id": "some-network-id",
2783 target_record_id
= "vld.sdn.something"
2787 "id": "some-network-id",
2792 result
= Ns
._process
_net
_params
(
2793 target_vld
=target_vld
,
2796 target_record_id
=target_record_id
,
2799 self
.assertDictEqual(expected_result
, result
)
2800 self
.assertFalse(ip_profile_to_ro
.called
)
2802 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2803 def test__process_net_params_with_mgmt_network(
2810 "mgmt-network": "some-mgmt-network",
2816 target_record_id
= "vld.sdn.something"
2824 result
= Ns
._process
_net
_params
(
2825 target_vld
=target_vld
,
2828 target_record_id
=target_record_id
,
2831 self
.assertDictEqual(expected_result
, result
)
2832 self
.assertFalse(ip_profile_to_ro
.called
)
2834 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2835 def test__process_net_params_with_underlay_eline(
2841 "underlay": "some-underlay-here",
2848 "provider_network": "some-profile-here",
2850 target_record_id
= ""
2854 "some_ip_profile": "here",
2856 "net_name": "ns-name-vld-name",
2858 "provider_network_profile": "some-profile-here",
2862 ip_profile_to_ro
.return_value
= {
2863 "some_ip_profile": "here",
2866 result
= Ns
._process
_net
_params
(
2867 target_vld
=target_vld
,
2870 target_record_id
=target_record_id
,
2873 self
.assertDictEqual(expected_result
, result
)
2874 self
.assertTrue(ip_profile_to_ro
.called
)
2876 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2877 def test__process_net_params_with_underlay_elan(
2883 "underlay": "some-underlay-here",
2890 "provider_network": "some-profile-here",
2892 target_record_id
= ""
2896 "some_ip_profile": "here",
2898 "net_name": "ns-name-vld-name",
2900 "provider_network_profile": "some-profile-here",
2904 ip_profile_to_ro
.return_value
= {
2905 "some_ip_profile": "here",
2908 result
= Ns
._process
_net
_params
(
2909 target_vld
=target_vld
,
2912 target_record_id
=target_record_id
,
2915 self
.assertDictEqual(expected_result
, result
)
2916 self
.assertTrue(ip_profile_to_ro
.called
)
2918 def test__get_cloud_init_exception(self
):
2919 db_mock
= MagicMock(name
="database mock")
2924 with self
.assertRaises(NsException
):
2925 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2927 def test__get_cloud_init_file_fs_exception(self
):
2928 db_mock
= MagicMock(name
="database mock")
2931 location
= "vnfr_id_123456:file:test_file"
2932 db_mock
.get_one
.return_value
= {
2935 "folder": "/home/osm",
2936 "pkg-dir": "vnfr_test_dir",
2941 with self
.assertRaises(NsException
):
2942 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2944 def test__get_cloud_init_file(self
):
2945 db_mock
= MagicMock(name
="database mock")
2946 fs_mock
= MagicMock(name
="filesystem mock")
2947 file_mock
= MagicMock(name
="file mock")
2949 location
= "vnfr_id_123456:file:test_file"
2950 cloud_init_content
= "this is a cloud init file content"
2952 db_mock
.get_one
.return_value
= {
2955 "folder": "/home/osm",
2956 "pkg-dir": "vnfr_test_dir",
2960 fs_mock
.file_open
.return_value
= file_mock
2961 file_mock
.__enter
__.return_value
.read
.return_value
= cloud_init_content
2963 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2965 self
.assertEqual(cloud_init_content
, result
)
2967 def test__get_cloud_init_vdu(self
):
2968 db_mock
= MagicMock(name
="database mock")
2971 location
= "vnfr_id_123456:vdu:0"
2972 cloud_init_content
= "this is a cloud init file content"
2974 db_mock
.get_one
.return_value
= {
2977 "cloud-init": cloud_init_content
,
2982 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2984 self
.assertEqual(cloud_init_content
, result
)
2986 @patch("jinja2.Environment.__init__")
2987 def test__parse_jinja2_undefined_error(self
, env_mock
: Mock
):
2988 cloud_init_content
= None
2992 env_mock
.side_effect
= UndefinedError("UndefinedError occurred.")
2994 with self
.assertRaises(NsException
):
2996 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2999 @patch("jinja2.Environment.__init__")
3000 def test__parse_jinja2_template_error(self
, env_mock
: Mock
):
3001 cloud_init_content
= None
3005 env_mock
.side_effect
= TemplateError("TemplateError occurred.")
3007 with self
.assertRaises(NsException
):
3009 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3012 @patch("jinja2.Environment.__init__")
3013 def test__parse_jinja2_template_not_found(self
, env_mock
: Mock
):
3014 cloud_init_content
= None
3018 env_mock
.side_effect
= TemplateNotFound("TemplateNotFound occurred.")
3020 with self
.assertRaises(NsException
):
3022 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3025 def test_rendering_jinja2_temp_without_special_characters(self
):
3026 cloud_init_content
= """
3029 table_type: {{type}}
3031 overwrite: {{is_override}}
3034 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3038 "is_override": "False",
3039 "command": "; mkdir abc",
3041 context
= "cloud-init for VM"
3042 expected_result
= """
3050 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
3052 result
= Ns
._parse
_jinja
2(
3053 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3055 self
.assertEqual(result
, expected_result
)
3057 def test_rendering_jinja2_temp_with_special_characters(self
):
3058 cloud_init_content
= """
3061 table_type: {{type}}
3063 overwrite: {{is_override}}
3066 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3070 "is_override": "False",
3071 "command": "& rm -rf",
3073 context
= "cloud-init for VM"
3074 expected_result
= """
3082 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3084 result
= Ns
._parse
_jinja
2(
3085 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3087 self
.assertNotEqual(result
, expected_result
)
3089 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self
):
3090 with
patch("osm_ng_ro.ns.Environment") as mock_environment
:
3091 mock_environment
.return_value
= Environment(
3092 undefined
=StrictUndefined
,
3093 autoescape
=select_autoescape(default_for_string
=False, default
=False),
3095 cloud_init_content
= """
3098 table_type: {{type}}
3100 overwrite: {{is_override}}
3103 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3107 "is_override": "False",
3108 "command": "& rm -rf /",
3110 context
= "cloud-init for VM"
3111 expected_result
= """
3119 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3121 result
= Ns
._parse
_jinja
2(
3122 cloud_init_content
=cloud_init_content
,
3126 self
.assertEqual(result
, expected_result
)
3128 @patch("osm_ng_ro.ns.Ns._assign_vim")
3129 def test__rebuild_start_stop_task(self
, assign_vim
):
3132 actions
= ["start", "stop", "rebuild"]
3133 vdu_id
= "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3134 vnf_id
= "665b4165-ce24-4320-bf19-b9a45bade49f"
3136 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3137 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3139 target_vim
= "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3140 t
= "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3141 for action
in actions
:
3143 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3144 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3145 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3146 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
3147 "status": "SCHEDULED",
3150 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0",
3151 "target_record_id": t
,
3153 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3157 extra_dict
["params"] = {
3158 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3161 task
= self
.ns
.rebuild_start_stop_task(
3171 self
.assertEqual(task
.get("action_id"), action_id
)
3172 self
.assertEqual(task
.get("nsr_id"), nsr_id
)
3173 self
.assertEqual(task
.get("target_id"), target_vim
)
3174 self
.assertDictEqual(task
, expected_result
)
3176 @patch("osm_ng_ro.ns.Ns._assign_vim")
3177 def test_verticalscale_task(self
, assign_vim
):
3181 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3182 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3184 target_record_id
= (
3185 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3186 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3190 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3191 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3192 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3193 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3194 "status": "SCHEDULED",
3196 "item": "verticalscale",
3197 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3198 "target_record_id": target_record_id
,
3200 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3201 "flavor_dict": "flavor_dict",
3205 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3207 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3210 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3211 extra_dict
["params"] = {
3212 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3213 "flavor_dict": "flavor_dict",
3215 task
= self
.ns
.verticalscale_task(
3216 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3219 self
.assertDictEqual(task
, expected_result
)
3221 @patch("osm_ng_ro.ns.Ns._assign_vim")
3222 def test_migrate_task(self
, assign_vim
):
3226 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3227 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3229 target_record_id
= (
3230 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3231 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3235 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3236 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3237 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3238 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3239 "status": "SCHEDULED",
3242 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3243 "target_record_id": target_record_id
,
3245 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3246 "migrate_host": "migrateToHost",
3250 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3252 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3255 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3256 extra_dict
["params"] = {
3257 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3258 "migrate_host": "migrateToHost",
3260 task
= self
.ns
.migrate_task(
3261 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3264 self
.assertDictEqual(task
, expected_result
)
3267 class TestProcessVduParams(unittest
.TestCase
):
3270 self
.logger
= CopyingMock(autospec
=True)
3272 def test_find_persistent_root_volumes_empty_instantiation_vol_list(self
):
3273 """Find persistent root volume, instantiation_vol_list is empty."""
3274 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3275 target_vdu
= target_vdu_wth_persistent_storage
3276 vdu_instantiation_volumes_list
= []
3278 expected_persist_root_disk
= {
3279 "persistent-root-volume": {
3280 "image_id": "ubuntu20.04",
3284 expected_disk_list
= [
3286 "image_id": "ubuntu20.04",
3290 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3291 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3293 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3294 self
.assertEqual(disk_list
, expected_disk_list
)
3295 self
.assertEqual(len(disk_list
), 1)
3297 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(self
):
3298 """Find persistent root volume, always selects the first vsd as root volume."""
3299 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3300 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3301 "persistent-volume2",
3302 "persistent-root-volume",
3305 target_vdu
= target_vdu_wth_persistent_storage
3306 vdu_instantiation_volumes_list
= []
3308 expected_persist_root_disk
= {
3309 "persistent-volume2": {
3310 "image_id": "ubuntu20.04",
3314 expected_disk_list
= [
3316 "image_id": "ubuntu20.04",
3320 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3321 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3323 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3324 self
.assertEqual(disk_list
, expected_disk_list
)
3325 self
.assertEqual(len(disk_list
), 1)
3327 def test_find_persistent_root_volumes_empty_size_of_storage(self
):
3328 """Find persistent root volume, size of storage is empty."""
3329 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3330 vnfd
["virtual-storage-desc"][0]["size-of-storage"] = ""
3331 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3332 "persistent-volume2",
3333 "persistent-root-volume",
3336 target_vdu
= target_vdu_wth_persistent_storage
3337 vdu_instantiation_volumes_list
= []
3339 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3340 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3342 self
.assertEqual(persist_root_disk
, None)
3343 self
.assertEqual(disk_list
, [])
3345 def test_find_persistent_root_empty_disk_list(self
):
3346 """Find persistent root volume, empty disk list."""
3347 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3348 target_vdu
= target_vdu_wth_persistent_storage
3349 vdu_instantiation_volumes_list
= []
3351 expected_persist_root_disk
= {
3352 "persistent-root-volume": {
3353 "image_id": "ubuntu20.04",
3357 expected_disk_list
= [
3359 "image_id": "ubuntu20.04",
3363 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3364 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3366 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3367 self
.assertEqual(disk_list
, expected_disk_list
)
3368 self
.assertEqual(len(disk_list
), 1)
3370 def test_find_persistent_root_volumes_target_vdu_mismatch(self
):
3371 """Find persistent root volume, target vdu name is not matching."""
3372 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3373 vnfd
["vdu"][0]["name"] = "Several_Volumes-VM"
3374 target_vdu
= target_vdu_wth_persistent_storage
3375 vdu_instantiation_volumes_list
= []
3377 result
= self
.ns
.find_persistent_root_volumes(
3378 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3380 self
.assertEqual(result
, None)
3381 self
.assertEqual(disk_list
, [])
3382 self
.assertEqual(len(disk_list
), 0)
3384 def test_find_persistent_root_volumes_with_instantiation_vol_list(self
):
3385 """Find persistent root volume, existing volume needs to be used."""
3386 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3387 target_vdu
= target_vdu_wth_persistent_storage
3388 vdu_instantiation_volumes_list
= [
3390 "vim-volume-id": vim_volume_id
,
3391 "name": "persistent-root-volume",
3395 expected_persist_root_disk
= {
3396 "persistent-root-volume": {
3397 "vim_volume_id": vim_volume_id
,
3398 "image_id": "ubuntu20.04",
3401 expected_disk_list
= [
3403 "vim_volume_id": vim_volume_id
,
3404 "image_id": "ubuntu20.04",
3407 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3408 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3410 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3411 self
.assertEqual(disk_list
, expected_disk_list
)
3412 self
.assertEqual(len(disk_list
), 1)
3414 def test_find_persistent_root_volumes_invalid_instantiation_params(self
):
3415 """Find persistent root volume, existing volume id keyword is invalid."""
3416 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3417 target_vdu
= target_vdu_wth_persistent_storage
3418 vdu_instantiation_volumes_list
= [
3420 "volume-id": vim_volume_id
,
3421 "name": "persistent-root-volume",
3425 with self
.assertRaises(KeyError):
3426 self
.ns
.find_persistent_root_volumes(
3427 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3430 self
.assertEqual(disk_list
, [])
3431 self
.assertEqual(len(disk_list
), 0)
3433 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3436 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3437 persistent_root_disk
= {
3438 "persistent-root-volume": {
3439 "image_id": "ubuntu20.04",
3444 target_vdu
= target_vdu_wth_persistent_storage
3445 vdu_instantiation_volumes_list
= []
3448 "image_id": "ubuntu20.04",
3453 expected_disk_list
= [
3455 "image_id": "ubuntu20.04",
3462 self
.ns
.find_persistent_volumes(
3463 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3465 self
.assertEqual(disk_list
, expected_disk_list
)
3467 def test_find_persistent_volumes_vdu_wth_inst_vol_list(self
):
3468 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3469 persistent_root_disk
= {
3470 "persistent-root-volume": {
3471 "image_id": "ubuntu20.04",
3475 vdu_instantiation_volumes_list
= [
3477 "vim-volume-id": vim_volume_id
,
3478 "name": "persistent-volume2",
3481 target_vdu
= target_vdu_wth_persistent_storage
3484 "image_id": "ubuntu20.04",
3488 expected_disk_list
= [
3490 "image_id": "ubuntu20.04",
3494 "vim_volume_id": vim_volume_id
,
3497 self
.ns
.find_persistent_volumes(
3498 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3500 self
.assertEqual(disk_list
, expected_disk_list
)
3502 def test_find_persistent_volumes_vdu_wthout_persistent_storage(self
):
3503 """Find persistent ordinary volume, there is not any persistent disk."""
3504 persistent_root_disk
= {}
3505 vdu_instantiation_volumes_list
= []
3506 target_vdu
= target_vdu_wthout_persistent_storage
3508 self
.ns
.find_persistent_volumes(
3509 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3511 self
.assertEqual(disk_list
, disk_list
)
3513 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3516 """There is persistent root disk, but there is not ordinary persistent disk."""
3517 persistent_root_disk
= {
3518 "persistent-root-volume": {
3519 "image_id": "ubuntu20.04",
3523 vdu_instantiation_volumes_list
= []
3524 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3525 target_vdu
["virtual-storages"] = [
3527 "id": "persistent-root-volume",
3528 "size-of-storage": "10",
3529 "type-of-storage": "persistent-storage:persistent-storage",
3532 "id": "ephemeral-volume",
3533 "size-of-storage": "1",
3534 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3539 "image_id": "ubuntu20.04",
3543 self
.ns
.find_persistent_volumes(
3544 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3546 self
.assertEqual(disk_list
, disk_list
)
3548 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(self
):
3549 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3550 vim-volume-id is given as instantiation parameter but disk id is not matching.
3552 vdu_instantiation_volumes_list
= [
3554 "vim-volume-id": vim_volume_id
,
3555 "name": "persistent-volume3",
3558 persistent_root_disk
= {
3559 "persistent-root-volume": {
3560 "image_id": "ubuntu20.04",
3566 "image_id": "ubuntu20.04",
3570 expected_disk_list
= [
3572 "image_id": "ubuntu20.04",
3580 target_vdu
= target_vdu_wth_persistent_storage
3581 self
.ns
.find_persistent_volumes(
3582 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3584 self
.assertEqual(disk_list
, expected_disk_list
)
3586 def test_sort_vdu_interfaces_position_all_wth_positions(self
):
3587 """Interfaces are sorted according to position, all have positions."""
3588 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3589 target_vdu
["interfaces"] = [
3592 "ns-vld-id": "datanet",
3597 "ns-vld-id": "mgmtnet",
3601 sorted_interfaces
= [
3604 "ns-vld-id": "mgmtnet",
3609 "ns-vld-id": "datanet",
3613 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3614 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3616 def test_sort_vdu_interfaces_position_some_wth_position(self
):
3617 """Interfaces are sorted according to position, some of them have positions."""
3618 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3619 target_vdu
["interfaces"] = [
3622 "ns-vld-id": "mgmtnet",
3626 "ns-vld-id": "datanet",
3630 sorted_interfaces
= [
3633 "ns-vld-id": "datanet",
3638 "ns-vld-id": "mgmtnet",
3641 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3642 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3644 def test_sort_vdu_interfaces_position_empty_interface_list(self
):
3645 """Interface list is empty."""
3646 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3647 target_vdu
["interfaces"] = []
3648 sorted_interfaces
= []
3649 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3650 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3652 def test_partially_locate_vdu_interfaces(self
):
3653 """Some interfaces have positions."""
3654 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3655 target_vdu
["interfaces"] = [
3658 "ns-vld-id": "net1",
3660 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3663 "ns-vld-id": "mgmtnet",
3667 "ns-vld-id": "datanet",
3671 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3672 self
.assertDictEqual(
3673 target_vdu
["interfaces"][0],
3676 "ns-vld-id": "datanet",
3680 self
.assertDictEqual(
3681 target_vdu
["interfaces"][2],
3682 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3685 def test_partially_locate_vdu_interfaces_position_start_from_0(self
):
3686 """Some interfaces have positions, position start from 0."""
3687 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3688 target_vdu
["interfaces"] = [
3691 "ns-vld-id": "net1",
3693 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3696 "ns-vld-id": "mgmtnet",
3700 "ns-vld-id": "datanet",
3704 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3705 self
.assertDictEqual(
3706 target_vdu
["interfaces"][0],
3709 "ns-vld-id": "datanet",
3713 self
.assertDictEqual(
3714 target_vdu
["interfaces"][3],
3715 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3718 def test_partially_locate_vdu_interfaces_wthout_position(self
):
3719 """Interfaces do not have positions."""
3720 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3721 target_vdu
["interfaces"] = interfaces_wthout_positions
3722 expected_result
= deepcopy(target_vdu
["interfaces"])
3723 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3724 self
.assertEqual(target_vdu
["interfaces"], expected_result
)
3726 def test_partially_locate_vdu_interfaces_all_has_position(self
):
3727 """All interfaces have position."""
3728 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3729 target_vdu
["interfaces"] = interfaces_wth_all_positions
3730 expected_interfaces
= [
3733 "ns-vld-id": "net2",
3738 "ns-vld-id": "mgmtnet",
3743 "ns-vld-id": "net1",
3747 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3748 self
.assertEqual(target_vdu
["interfaces"], expected_interfaces
)
3750 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3751 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3752 def test_prepare_vdu_cloud_init(self
, mock_parse_jinja2
, mock_get_cloud_init
):
3753 """Target_vdu has cloud-init and boot-data-drive."""
3754 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3755 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3756 target_vdu
["boot-data-drive"] = "vda"
3758 mock_get_cloud_init
.return_value
= cloud_init_content
3759 mock_parse_jinja2
.return_value
= user_data
3761 "user-data": user_data
,
3762 "boot-data-drive": "vda",
3764 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3765 self
.assertDictEqual(result
, expected_result
)
3766 mock_get_cloud_init
.assert_called_once_with(
3767 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3769 mock_parse_jinja2
.assert_called_once_with(
3770 cloud_init_content
=cloud_init_content
,
3772 context
="sample-cloud-init-path",
3775 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3776 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3777 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3778 self
, mock_parse_jinja2
, mock_get_cloud_init
3780 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3781 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3782 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3783 target_vdu
["boot-data-drive"] = "vda"
3785 mock_get_cloud_init
.side_effect
= NsException(
3786 "Mismatch descriptor for cloud init."
3789 with self
.assertRaises(NsException
) as err
:
3790 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3791 self
.assertEqual(str(err
.exception
), "Mismatch descriptor for cloud init.")
3793 mock_get_cloud_init
.assert_called_once_with(
3794 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3796 mock_parse_jinja2
.assert_not_called()
3798 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3799 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3800 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3801 self
, mock_parse_jinja2
, mock_get_cloud_init
3803 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3804 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3805 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3806 target_vdu
["boot-data-drive"] = "vda"
3808 mock_get_cloud_init
.return_value
= cloud_init_content
3809 mock_parse_jinja2
.side_effect
= NsException("Error parsing cloud-init content.")
3811 with self
.assertRaises(NsException
) as err
:
3812 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3813 self
.assertEqual(str(err
.exception
), "Error parsing cloud-init content.")
3814 mock_get_cloud_init
.assert_called_once_with(
3815 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3817 mock_parse_jinja2
.assert_called_once_with(
3818 cloud_init_content
=cloud_init_content
,
3820 context
="sample-cloud-init-path",
3823 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3824 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3825 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
3826 self
, mock_parse_jinja2
, mock_get_cloud_init
3828 """Target_vdu has cloud-init but do not have boot-data-drive."""
3829 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3830 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3832 mock_get_cloud_init
.return_value
= cloud_init_content
3833 mock_parse_jinja2
.return_value
= user_data
3835 "user-data": user_data
,
3837 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3838 self
.assertDictEqual(result
, expected_result
)
3839 mock_get_cloud_init
.assert_called_once_with(
3840 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3842 mock_parse_jinja2
.assert_called_once_with(
3843 cloud_init_content
=cloud_init_content
,
3845 context
="sample-cloud-init-path",
3848 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3849 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3850 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
3851 self
, mock_parse_jinja2
, mock_get_cloud_init
3853 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
3854 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3855 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3856 target_vdu
["boot-data-drive"] = "vda"
3857 vdu2cloud_init
= {"sample-cloud-init-path": cloud_init_content
}
3858 mock_parse_jinja2
.return_value
= user_data
3860 "user-data": user_data
,
3861 "boot-data-drive": "vda",
3863 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3864 self
.assertDictEqual(result
, expected_result
)
3865 mock_get_cloud_init
.assert_not_called()
3866 mock_parse_jinja2
.assert_called_once_with(
3867 cloud_init_content
=cloud_init_content
,
3869 context
="sample-cloud-init-path",
3872 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3873 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3874 def test_prepare_vdu_cloud_init_no_cloud_init(
3875 self
, mock_parse_jinja2
, mock_get_cloud_init
3877 """Target_vdu do not have cloud-init."""
3878 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3879 target_vdu
["boot-data-drive"] = "vda"
3882 "boot-data-drive": "vda",
3884 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3885 self
.assertDictEqual(result
, expected_result
)
3886 mock_get_cloud_init
.assert_not_called()
3887 mock_parse_jinja2
.assert_not_called()
3889 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self
):
3890 """ns_vld and vnf_vld both exist."""
3893 "ns-vld-id": "mgmtnet",
3894 "vnf-vld-id": "mgmt_cp_int",
3896 expected_result
= f
"{ns_preffix}:vld.mgmtnet"
3897 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3898 interface
, ns_preffix
, vnf_preffix
3900 self
.assertEqual(result
, expected_result
)
3902 def test_check_vld_information_of_interfaces_empty_interfaces(self
):
3903 """Interface dict is empty."""
3905 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3906 interface
, ns_preffix
, vnf_preffix
3908 self
.assertEqual(result
, "")
3910 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self
):
3911 """Interface dict has only vnf_vld."""
3914 "vnf-vld-id": "mgmt_cp_int",
3916 expected_result
= f
"{vnf_preffix}:vld.mgmt_cp_int"
3917 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3918 interface
, ns_preffix
, vnf_preffix
3920 self
.assertEqual(result
, expected_result
)
3922 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
3925 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
3928 "vnf-vld-id": "mgmt_cp_int",
3931 with self
.assertRaises(Exception) as err
:
3932 self
.ns
._check
_vld
_information
_of
_interfaces
(
3933 interface
, ns_preffix
, vnf_preffix
3935 self
.assertEqual(type(err
), TypeError)
3937 def test_prepare_interface_port_security_has_security_details(self
):
3938 """Interface dict has port security details."""
3941 "ns-vld-id": "mgmtnet",
3942 "vnf-vld-id": "mgmt_cp_int",
3943 "port-security-enabled": True,
3944 "port-security-disable-strategy": "allow-address-pairs",
3946 expected_interface
= {
3948 "ns-vld-id": "mgmtnet",
3949 "vnf-vld-id": "mgmt_cp_int",
3950 "port_security": True,
3951 "port_security_disable_strategy": "allow-address-pairs",
3953 self
.ns
._prepare
_interface
_port
_security
(interface
)
3954 self
.assertDictEqual(interface
, expected_interface
)
3956 def test_prepare_interface_port_security_empty_interfaces(self
):
3957 """Interface dict is empty."""
3959 expected_interface
= {}
3960 self
.ns
._prepare
_interface
_port
_security
(interface
)
3961 self
.assertDictEqual(interface
, expected_interface
)
3963 def test_prepare_interface_port_security_wthout_port_security(self
):
3964 """Interface dict does not have port security details."""
3967 "ns-vld-id": "mgmtnet",
3968 "vnf-vld-id": "mgmt_cp_int",
3970 expected_interface
= {
3972 "ns-vld-id": "mgmtnet",
3973 "vnf-vld-id": "mgmt_cp_int",
3975 self
.ns
._prepare
_interface
_port
_security
(interface
)
3976 self
.assertDictEqual(interface
, expected_interface
)
3978 def test_create_net_item_of_interface_floating_ip_port_security(self
):
3979 """Interface dict has floating ip, port-security details."""
3982 "vcpi": "sample_vcpi",
3983 "port_security": True,
3984 "port_security_disable_strategy": "allow-address-pairs",
3985 "floating_ip": "10.1.1.12",
3986 "ns-vld-id": "mgmtnet",
3987 "vnf-vld-id": "mgmt_cp_int",
3989 net_text
= f
"{ns_preffix}"
3990 expected_net_item
= {
3992 "port_security": True,
3993 "port_security_disable_strategy": "allow-address-pairs",
3994 "floating_ip": "10.1.1.12",
3995 "net_id": f
"TASK-{ns_preffix}",
3998 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
3999 self
.assertDictEqual(result
, expected_net_item
)
4001 def test_create_net_item_of_interface_invalid_net_text(self
):
4002 """net-text is invalid."""
4005 "vcpi": "sample_vcpi",
4006 "port_security": True,
4007 "port_security_disable_strategy": "allow-address-pairs",
4008 "floating_ip": "10.1.1.12",
4009 "ns-vld-id": "mgmtnet",
4010 "vnf-vld-id": "mgmt_cp_int",
4013 with self
.assertRaises(TypeError):
4014 self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4016 def test_create_net_item_of_interface_empty_interface(self
):
4017 """Interface dict is empty."""
4019 net_text
= ns_preffix
4020 expected_net_item
= {
4021 "net_id": f
"TASK-{ns_preffix}",
4024 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4025 self
.assertDictEqual(result
, expected_net_item
)
4027 @patch("osm_ng_ro.ns.deep_get")
4028 def test_prepare_type_of_interface_type_sriov(self
, mock_deep_get
):
4029 """Interface type is SR-IOV."""
4032 "vcpi": "sample_vcpi",
4033 "port_security": True,
4034 "port_security_disable_strategy": "allow-address-pairs",
4035 "floating_ip": "10.1.1.12",
4036 "ns-vld-id": "mgmtnet",
4037 "vnf-vld-id": "mgmt_cp_int",
4040 mock_deep_get
.return_value
= "SR-IOV"
4041 net_text
= ns_preffix
4043 expected_net_item
= {
4048 self
.ns
._prepare
_type
_of
_interface
(
4049 interface
, tasks_by_target_record_id
, net_text
, net_item
4051 self
.assertDictEqual(net_item
, expected_net_item
)
4054 tasks_by_target_record_id
[net_text
]["extra_dict"]["params"]["net_type"],
4056 mock_deep_get
.assert_called_once_with(
4057 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4060 @patch("osm_ng_ro.ns.deep_get")
4061 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
4064 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
4067 "vcpi": "sample_vcpi",
4068 "port_security": True,
4069 "port_security_disable_strategy": "allow-address-pairs",
4070 "floating_ip": "10.1.1.12",
4071 "ns-vld-id": "mgmtnet",
4072 "vnf-vld-id": "mgmt_cp_int",
4073 "type": "PCI-PASSTHROUGH",
4075 mock_deep_get
.return_value
= {}
4076 tasks_by_target_record_id
= {}
4077 net_text
= ns_preffix
4079 expected_net_item
= {
4081 "model": "PCI-PASSTHROUGH",
4082 "type": "PCI-PASSTHROUGH",
4084 self
.ns
._prepare
_type
_of
_interface
(
4085 interface
, tasks_by_target_record_id
, net_text
, net_item
4087 self
.assertDictEqual(net_item
, expected_net_item
)
4088 mock_deep_get
.assert_called_once_with(
4089 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4092 @patch("osm_ng_ro.ns.deep_get")
4093 def test_prepare_type_of_interface_type_mgmt(self
, mock_deep_get
):
4094 """Interface type is mgmt."""
4097 "vcpi": "sample_vcpi",
4098 "port_security": True,
4099 "port_security_disable_strategy": "allow-address-pairs",
4100 "floating_ip": "10.1.1.12",
4101 "ns-vld-id": "mgmtnet",
4102 "vnf-vld-id": "mgmt_cp_int",
4105 tasks_by_target_record_id
= {}
4106 net_text
= ns_preffix
4108 expected_net_item
= {
4111 self
.ns
._prepare
_type
_of
_interface
(
4112 interface
, tasks_by_target_record_id
, net_text
, net_item
4114 self
.assertDictEqual(net_item
, expected_net_item
)
4115 mock_deep_get
.assert_not_called()
4117 @patch("osm_ng_ro.ns.deep_get")
4118 def test_prepare_type_of_interface_type_bridge(self
, mock_deep_get
):
4119 """Interface type is bridge."""
4122 "vcpi": "sample_vcpi",
4123 "port_security": True,
4124 "port_security_disable_strategy": "allow-address-pairs",
4125 "floating_ip": "10.1.1.12",
4126 "ns-vld-id": "mgmtnet",
4127 "vnf-vld-id": "mgmt_cp_int",
4129 tasks_by_target_record_id
= {}
4130 net_text
= ns_preffix
4132 expected_net_item
= {
4136 self
.ns
._prepare
_type
_of
_interface
(
4137 interface
, tasks_by_target_record_id
, net_text
, net_item
4139 self
.assertDictEqual(net_item
, expected_net_item
)
4140 mock_deep_get
.assert_not_called()
4142 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4143 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4144 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4145 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4146 def test_prepare_vdu_interfaces(
4148 mock_type_of_interface
,
4149 mock_item_of_interface
,
4151 mock_vld_information_of_interface
,
4153 """Prepare vdu interfaces successfully."""
4154 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4157 "ns-vld-id": "net1",
4158 "ip-address": "13.2.12.31",
4159 "mgmt-interface": True,
4163 "vnf-vld-id": "net2",
4164 "mac-address": "d0:94:66:ed:fc:e2",
4168 "ns-vld-id": "mgmtnet",
4170 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4172 "params": "test_params",
4173 "find_params": "test_find_params",
4177 net_text_1
= f
"{ns_preffix}:net1"
4178 net_text_2
= f
"{vnf_preffix}:net2"
4179 net_text_3
= f
"{ns_preffix}:mgmtnet"
4182 "net_id": f
"TASK-{ns_preffix}",
4187 "net_id": f
"TASK-{ns_preffix}",
4192 "net_id": f
"TASK-{ns_preffix}",
4195 mock_item_of_interface
.side_effect
= [net_item_1
, net_item_2
, net_item_3
]
4196 mock_vld_information_of_interface
.side_effect
= [
4202 expected_extra_dict
= {
4203 "params": "test_params",
4204 "find_params": "test_find_params",
4205 "depends_on": [net_text_1
, net_text_2
, net_text_3
],
4206 "mgmt_vdu_interface": 0,
4208 updated_net_item1
= deepcopy(net_item_1
)
4209 updated_net_item1
.update({"ip_address": "13.2.12.31"})
4210 updated_net_item2
= deepcopy(net_item_2
)
4211 updated_net_item2
.update({"mac_address": "d0:94:66:ed:fc:e2"})
4212 expected_net_list
= [updated_net_item1
, updated_net_item2
, net_item_3
]
4213 self
.ns
._prepare
_vdu
_interfaces
(
4219 tasks_by_target_record_id
,
4222 _call_mock_vld_information_of_interface
= (
4223 mock_vld_information_of_interface
.call_args_list
4226 _call_mock_vld_information_of_interface
[0][0],
4227 (interface_1
, ns_preffix
, vnf_preffix
),
4230 _call_mock_vld_information_of_interface
[1][0],
4231 (interface_2
, ns_preffix
, vnf_preffix
),
4234 _call_mock_vld_information_of_interface
[2][0],
4235 (interface_3
, ns_preffix
, vnf_preffix
),
4238 _call_mock_port_security
= mock_port_security
.call_args_list
4239 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4240 self
.assertEqual(_call_mock_port_security
[1].args
[0], interface_2
)
4241 self
.assertEqual(_call_mock_port_security
[2].args
[0], interface_3
)
4243 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4244 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4245 self
.assertEqual(_call_mock_item_of_interface
[1][0], (interface_2
, net_text_2
))
4246 self
.assertEqual(_call_mock_item_of_interface
[2][0], (interface_3
, net_text_3
))
4248 _call_mock_type_of_interface
= mock_type_of_interface
.call_args_list
4250 _call_mock_type_of_interface
[0][0],
4251 (interface_1
, tasks_by_target_record_id
, net_text_1
, net_item_1
),
4254 _call_mock_type_of_interface
[1][0],
4255 (interface_2
, tasks_by_target_record_id
, net_text_2
, net_item_2
),
4258 _call_mock_type_of_interface
[2][0],
4259 (interface_3
, tasks_by_target_record_id
, net_text_3
, net_item_3
),
4261 self
.assertEqual(net_list
, expected_net_list
)
4262 self
.assertEqual(extra_dict
, expected_extra_dict
)
4263 self
.logger
.error
.assert_not_called()
4265 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4266 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4267 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4268 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4269 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4271 mock_type_of_interface
,
4272 mock_item_of_interface
,
4274 mock_vld_information_of_interface
,
4276 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4277 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4280 "ns-vld-id": "net1",
4281 "ip-address": "13.2.12.31",
4282 "mgmt-interface": True,
4286 "vnf-vld-id": "net2",
4287 "mac-address": "d0:94:66:ed:fc:e2",
4291 "ns-vld-id": "mgmtnet",
4293 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4295 "params": "test_params",
4296 "find_params": "test_find_params",
4299 net_text_1
= f
"{ns_preffix}:net1"
4300 mock_item_of_interface
.side_effect
= [TypeError, TypeError, TypeError]
4302 mock_vld_information_of_interface
.side_effect
= [net_text_1
]
4304 expected_extra_dict
= {
4305 "params": "test_params",
4306 "find_params": "test_find_params",
4307 "depends_on": [net_text_1
],
4309 with self
.assertRaises(TypeError):
4310 self
.ns
._prepare
_vdu
_interfaces
(
4316 tasks_by_target_record_id
,
4320 _call_mock_vld_information_of_interface
= (
4321 mock_vld_information_of_interface
.call_args_list
4324 _call_mock_vld_information_of_interface
[0][0],
4325 (interface_1
, ns_preffix
, vnf_preffix
),
4328 _call_mock_port_security
= mock_port_security
.call_args_list
4329 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4331 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4332 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4334 mock_type_of_interface
.assert_not_called()
4335 self
.logger
.error
.assert_not_called()
4336 self
.assertEqual(net_list
, [])
4337 self
.assertEqual(extra_dict
, expected_extra_dict
)
4339 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4340 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4341 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4342 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4343 def test_prepare_vdu_interfaces_vld_information_is_empty(
4345 mock_type_of_interface
,
4346 mock_item_of_interface
,
4348 mock_vld_information_of_interface
,
4350 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4351 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4354 "ns-vld-id": "net1",
4355 "ip-address": "13.2.12.31",
4356 "mgmt-interface": True,
4360 "vnf-vld-id": "net2",
4361 "mac-address": "d0:94:66:ed:fc:e2",
4365 "ns-vld-id": "mgmtnet",
4367 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4369 "params": "test_params",
4370 "find_params": "test_find_params",
4373 mock_vld_information_of_interface
.side_effect
= ["", "", ""]
4375 self
.ns
._prepare
_vdu
_interfaces
(
4381 tasks_by_target_record_id
,
4385 _call_mock_vld_information_of_interface
= (
4386 mock_vld_information_of_interface
.call_args_list
4389 _call_mock_vld_information_of_interface
[0][0],
4390 (interface_1
, ns_preffix
, vnf_preffix
),
4393 _call_mock_vld_information_of_interface
[1][0],
4394 (interface_2
, ns_preffix
, vnf_preffix
),
4397 _call_mock_vld_information_of_interface
[2][0],
4398 (interface_3
, ns_preffix
, vnf_preffix
),
4401 _call_logger
= self
.logger
.error
.call_args_list
4404 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4408 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4412 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4414 self
.assertEqual(net_list
, [])
4418 "params": "test_params",
4419 "find_params": "test_find_params",
4424 mock_item_of_interface
.assert_not_called()
4425 mock_port_security
.assert_not_called()
4426 mock_type_of_interface
.assert_not_called()
4428 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4429 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4430 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4431 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4432 def test_prepare_vdu_interfaces_empty_interface_list(
4434 mock_type_of_interface
,
4435 mock_item_of_interface
,
4437 mock_vld_information_of_interface
,
4439 """Prepare vdu interfaces, interface list is empty."""
4440 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4441 target_vdu
["interfaces"] = []
4444 self
.ns
._prepare
_vdu
_interfaces
(
4450 tasks_by_target_record_id
,
4453 mock_type_of_interface
.assert_not_called()
4454 mock_vld_information_of_interface
.assert_not_called()
4455 mock_item_of_interface
.assert_not_called()
4456 mock_port_security
.assert_not_called()
4458 def test_prepare_vdu_ssh_keys(self
):
4459 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4460 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4461 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4462 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4463 target_vdu
["ssh-access-required"] = True
4465 expected_cloud_config
= {
4466 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4468 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4469 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4471 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self
):
4472 """Target_vdu does not have ssh-keys."""
4473 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4474 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4475 target_vdu
["ssh-access-required"] = True
4477 expected_cloud_config
= {"key-pairs": [{"public_key": "path_of_public_key"}]}
4478 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4479 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4481 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self
):
4482 """Target_vdu has ssh-keys, ssh-access is not required."""
4483 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4484 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4485 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4486 target_vdu
["ssh-access-required"] = False
4488 expected_cloud_config
= {"key-pairs": ["sample-ssh-key"]}
4489 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4490 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4492 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4493 def test_add_persistent_root_disk_to_disk_list(
4494 self
, mock_select_persistent_root_disk
4496 """Add persistent root disk to disk_list"""
4498 "id": "persistent-root-volume",
4499 "type-of-storage": "persistent-storage:persistent-storage",
4500 "size-of-storage": "10",
4502 mock_select_persistent_root_disk
.return_value
= root_disk
4503 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4504 vnfd
["virtual-storage-desc"][1] = root_disk
4505 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4506 persistent_root_disk
= {}
4508 expected_disk_list
= [
4510 "image_id": "ubuntu20.04",
4514 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4515 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4517 self
.assertEqual(disk_list
, expected_disk_list
)
4518 mock_select_persistent_root_disk
.assert_called_once()
4520 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4521 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4522 self
, mock_select_persistent_root_disk
4524 """Add persistent root disk to disk_list"""
4526 "id": "persistent-root-volume",
4527 "type-of-storage": "persistent-storage:persistent-storage",
4528 "size-of-storage": "10",
4530 mock_select_persistent_root_disk
.side_effect
= AttributeError
4531 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4532 vnfd
["virtual-storage-desc"][1] = root_disk
4533 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4534 persistent_root_disk
= {}
4536 with self
.assertRaises(AttributeError):
4537 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4538 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4540 self
.assertEqual(disk_list
, [])
4541 mock_select_persistent_root_disk
.assert_called_once()
4543 def test_add_persistent_ordinary_disk_to_disk_list(self
):
4544 """Add persistent ordinary disk to disk_list"""
4545 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4546 persistent_root_disk
= {
4547 "persistent-root-volume": {
4548 "image_id": "ubuntu20.04",
4552 persistent_ordinary_disk
= {}
4554 expected_disk_list
= [
4559 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4560 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4562 self
.assertEqual(disk_list
, expected_disk_list
)
4564 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(self
):
4565 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4566 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4567 persistent_root_disk
= {
4568 "persistent-root-volume": {
4569 "image_id": "ubuntu20.04",
4572 "persistent-volume2": {
4576 persistent_ordinary_disk
= {}
4579 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4580 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4582 self
.assertEqual(disk_list
, [])
4584 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4585 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4586 self
, mock_select_persistent_root_disk
4588 """VNFD does not have persistent storage."""
4589 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4590 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4591 mock_select_persistent_root_disk
.return_value
= None
4592 persistent_root_disk
= {}
4594 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4595 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4597 self
.assertEqual(disk_list
, [])
4598 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4600 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4601 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4602 self
, mock_select_persistent_root_disk
4604 """Persistent_root_disk dict is empty."""
4605 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4606 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4607 mock_select_persistent_root_disk
.return_value
= None
4608 persistent_root_disk
= {}
4610 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4611 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4613 self
.assertEqual(disk_list
, [])
4614 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4616 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self
):
4617 """Invalid extra dict."""
4618 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4619 target_vdu
["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4621 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4622 with self
.assertRaises(NsException
) as err
:
4623 self
.ns
._prepare
_vdu
_affinity
_group
_list
(target_vdu
, extra_dict
, ns_preffix
)
4624 self
.assertEqual(str(err
.exception
), "Invalid extra_dict format.")
4626 def test_prepare_vdu_affinity_group_list_one_affinity_group(self
):
4627 """There is one affinity-group."""
4628 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4629 target_vdu
["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4630 extra_dict
= {"depends_on": []}
4631 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4632 affinity_group_txt
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4633 expected_result
= [{"affinity_group_id": "TASK-" + affinity_group_txt
}]
4634 expected_extra_dict
= {"depends_on": [affinity_group_txt
]}
4635 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4636 target_vdu
, extra_dict
, ns_preffix
4638 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4639 self
.assertEqual(result
, expected_result
)
4641 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self
):
4642 """There are two affinity-groups."""
4643 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4644 target_vdu
["affinity-or-anti-affinity-group-id"] = [
4645 "affinity-group-id1",
4646 "affinity-group-id2",
4648 extra_dict
= {"depends_on": []}
4649 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4650 affinity_group_txt1
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4651 affinity_group_txt2
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4653 {"affinity_group_id": "TASK-" + affinity_group_txt1
},
4654 {"affinity_group_id": "TASK-" + affinity_group_txt2
},
4656 expected_extra_dict
= {"depends_on": [affinity_group_txt1
, affinity_group_txt2
]}
4657 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4658 target_vdu
, extra_dict
, ns_preffix
4660 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4661 self
.assertEqual(result
, expected_result
)
4663 def test_prepare_vdu_affinity_group_list_no_affinity_group(self
):
4664 """There is not any affinity-group."""
4665 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4666 extra_dict
= {"depends_on": []}
4667 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4668 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4669 target_vdu
, extra_dict
, ns_preffix
4671 self
.assertDictEqual(extra_dict
, {"depends_on": []})
4672 self
.assertEqual(result
, [])
4674 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4675 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4676 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4677 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4678 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4679 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4680 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4681 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4682 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4683 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4684 def test_process_vdu_params_with_inst_vol_list(
4686 mock_prepare_vdu_affinity_group_list
,
4687 mock_add_persistent_ordinary_disks_to_disk_list
,
4688 mock_add_persistent_root_disk_to_disk_list
,
4689 mock_find_persistent_volumes
,
4690 mock_find_persistent_root_volumes
,
4691 mock_prepare_vdu_ssh_keys
,
4692 mock_prepare_vdu_cloud_init
,
4693 mock_prepare_vdu_interfaces
,
4694 mock_locate_vdu_interfaces
,
4695 mock_sort_vdu_interfaces
,
4697 """Instantiation volume list is empty."""
4698 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4700 target_vdu
["interfaces"] = interfaces_wth_all_positions
4702 vdu_instantiation_vol_list
= [
4704 "vim-volume-id": vim_volume_id
,
4705 "name": "persistent-volume2",
4708 target_vdu
["additionalParams"] = {
4709 "OSM": {"vdu_volumes": vdu_instantiation_vol_list
}
4711 mock_prepare_vdu_cloud_init
.return_value
= {}
4712 mock_prepare_vdu_affinity_group_list
.return_value
= []
4713 persistent_root_disk
= {
4714 "persistent-root-volume": {
4715 "image_id": "ubuntu20.04",
4719 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
4721 new_kwargs
= deepcopy(kwargs
)
4726 "tasks_by_target_record_id": {},
4730 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
4731 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4732 db
.get_one
.return_value
= vnfd
4733 result
= Ns
._process
_vdu
_params
(
4734 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4736 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4737 mock_locate_vdu_interfaces
.assert_not_called()
4738 mock_prepare_vdu_cloud_init
.assert_called_once()
4739 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
4740 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
4741 mock_prepare_vdu_interfaces
.assert_called_once_with(
4743 expected_extra_dict_copy
,
4750 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4751 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4752 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4753 mock_find_persistent_volumes
.assert_called_once_with(
4754 persistent_root_disk
, target_vdu
, vdu_instantiation_vol_list
, []
4757 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4758 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4759 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4760 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4761 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4762 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4763 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4764 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4765 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4766 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4767 def test_process_vdu_params_wth_affinity_groups(
4769 mock_prepare_vdu_affinity_group_list
,
4770 mock_add_persistent_ordinary_disks_to_disk_list
,
4771 mock_add_persistent_root_disk_to_disk_list
,
4772 mock_find_persistent_volumes
,
4773 mock_find_persistent_root_volumes
,
4774 mock_prepare_vdu_ssh_keys
,
4775 mock_prepare_vdu_cloud_init
,
4776 mock_prepare_vdu_interfaces
,
4777 mock_locate_vdu_interfaces
,
4778 mock_sort_vdu_interfaces
,
4780 """There is cloud-config."""
4781 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4784 target_vdu
["interfaces"] = interfaces_wth_all_positions
4785 mock_prepare_vdu_cloud_init
.return_value
= {}
4786 mock_prepare_vdu_affinity_group_list
.return_value
= [
4791 new_kwargs
= deepcopy(kwargs
)
4796 "tasks_by_target_record_id": {},
4800 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
4801 expected_extra_dict3
["params"]["affinity_group_list"] = [
4805 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4806 db
.get_one
.return_value
= vnfd
4807 result
= Ns
._process
_vdu
_params
(
4808 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4810 self
.assertDictEqual(result
, expected_extra_dict3
)
4811 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4812 mock_locate_vdu_interfaces
.assert_not_called()
4813 mock_prepare_vdu_cloud_init
.assert_called_once()
4814 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4815 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4816 mock_prepare_vdu_interfaces
.assert_called_once_with(
4818 expected_extra_dict3
,
4826 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4827 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4828 mock_find_persistent_volumes
.assert_not_called()
4830 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4831 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4832 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4833 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4834 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4835 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4836 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4837 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4838 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4839 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4840 def test_process_vdu_params_wth_cloud_config(
4842 mock_prepare_vdu_affinity_group_list
,
4843 mock_add_persistent_ordinary_disks_to_disk_list
,
4844 mock_add_persistent_root_disk_to_disk_list
,
4845 mock_find_persistent_volumes
,
4846 mock_find_persistent_root_volumes
,
4847 mock_prepare_vdu_ssh_keys
,
4848 mock_prepare_vdu_cloud_init
,
4849 mock_prepare_vdu_interfaces
,
4850 mock_locate_vdu_interfaces
,
4851 mock_sort_vdu_interfaces
,
4853 """There is cloud-config."""
4854 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4857 target_vdu
["interfaces"] = interfaces_wth_all_positions
4858 mock_prepare_vdu_cloud_init
.return_value
= {
4859 "user-data": user_data
,
4860 "boot-data-drive": "vda",
4862 mock_prepare_vdu_affinity_group_list
.return_value
= []
4864 new_kwargs
= deepcopy(kwargs
)
4869 "tasks_by_target_record_id": {},
4873 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
4874 expected_extra_dict3
["params"]["cloud_config"] = {
4875 "user-data": user_data
,
4876 "boot-data-drive": "vda",
4878 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4879 db
.get_one
.return_value
= vnfd
4880 result
= Ns
._process
_vdu
_params
(
4881 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4883 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4884 mock_locate_vdu_interfaces
.assert_not_called()
4885 mock_prepare_vdu_cloud_init
.assert_called_once()
4886 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4887 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4888 mock_prepare_vdu_interfaces
.assert_called_once_with(
4890 expected_extra_dict3
,
4897 self
.assertDictEqual(result
, expected_extra_dict3
)
4898 mock_prepare_vdu_ssh_keys
.assert_called_once_with(
4899 target_vdu
, None, {"user-data": user_data
, "boot-data-drive": "vda"}
4901 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4902 mock_find_persistent_volumes
.assert_not_called()
4904 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4905 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4906 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4907 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4908 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4909 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4910 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4911 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4912 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4913 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4914 def test_process_vdu_params_wthout_persistent_storage(
4916 mock_prepare_vdu_affinity_group_list
,
4917 mock_add_persistent_ordinary_disks_to_disk_list
,
4918 mock_add_persistent_root_disk_to_disk_list
,
4919 mock_find_persistent_volumes
,
4920 mock_find_persistent_root_volumes
,
4921 mock_prepare_vdu_ssh_keys
,
4922 mock_prepare_vdu_cloud_init
,
4923 mock_prepare_vdu_interfaces
,
4924 mock_locate_vdu_interfaces
,
4925 mock_sort_vdu_interfaces
,
4927 """There is not any persistent storage."""
4928 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4931 target_vdu
["interfaces"] = interfaces_wth_all_positions
4932 mock_prepare_vdu_cloud_init
.return_value
= {}
4933 mock_prepare_vdu_affinity_group_list
.return_value
= []
4935 new_kwargs
= deepcopy(kwargs
)
4940 "tasks_by_target_record_id": {},
4944 expected_extra_dict_copy
= deepcopy(expected_extra_dict2
)
4945 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4946 db
.get_one
.return_value
= vnfd
4947 result
= Ns
._process
_vdu
_params
(
4948 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4950 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4951 mock_locate_vdu_interfaces
.assert_not_called()
4952 mock_prepare_vdu_cloud_init
.assert_called_once()
4953 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4954 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4955 mock_prepare_vdu_interfaces
.assert_called_once_with(
4957 expected_extra_dict_copy
,
4964 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4965 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4966 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4967 mock_find_persistent_volumes
.assert_not_called()
4969 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4970 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4971 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4972 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4973 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4974 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4975 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4976 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4977 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4978 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4979 def test_process_vdu_params_interfaces_partially_located(
4981 mock_prepare_vdu_affinity_group_list
,
4982 mock_add_persistent_ordinary_disks_to_disk_list
,
4983 mock_add_persistent_root_disk_to_disk_list
,
4984 mock_find_persistent_volumes
,
4985 mock_find_persistent_root_volumes
,
4986 mock_prepare_vdu_ssh_keys
,
4987 mock_prepare_vdu_cloud_init
,
4988 mock_prepare_vdu_interfaces
,
4989 mock_locate_vdu_interfaces
,
4990 mock_sort_vdu_interfaces
,
4992 """Some interfaces have position."""
4993 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4996 target_vdu
["interfaces"] = [
4999 "ns-vld-id": "net1",
5001 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
5004 "ns-vld-id": "mgmtnet",
5007 mock_prepare_vdu_cloud_init
.return_value
= {}
5008 mock_prepare_vdu_affinity_group_list
.return_value
= []
5009 persistent_root_disk
= {
5010 "persistent-root-volume": {
5011 "image_id": "ubuntu20.04",
5015 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5017 new_kwargs
= deepcopy(kwargs
)
5022 "tasks_by_target_record_id": {},
5027 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5028 db
.get_one
.return_value
= vnfd
5029 result
= Ns
._process
_vdu
_params
(
5030 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5032 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5033 mock_sort_vdu_interfaces
.assert_not_called()
5034 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5035 mock_prepare_vdu_cloud_init
.assert_called_once()
5036 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5037 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5038 mock_prepare_vdu_interfaces
.assert_called_once_with(
5040 expected_extra_dict_copy
,
5047 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5048 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5049 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5050 mock_find_persistent_volumes
.assert_not_called()
5051 mock_find_persistent_root_volumes
.assert_not_called()
5053 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5054 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5055 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5056 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5057 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5058 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5059 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5060 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5061 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5062 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5063 def test_process_vdu_params_no_interface_position(
5065 mock_prepare_vdu_affinity_group_list
,
5066 mock_add_persistent_ordinary_disks_to_disk_list
,
5067 mock_add_persistent_root_disk_to_disk_list
,
5068 mock_find_persistent_volumes
,
5069 mock_find_persistent_root_volumes
,
5070 mock_prepare_vdu_ssh_keys
,
5071 mock_prepare_vdu_cloud_init
,
5072 mock_prepare_vdu_interfaces
,
5073 mock_locate_vdu_interfaces
,
5074 mock_sort_vdu_interfaces
,
5076 """Interfaces do not have position."""
5077 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5080 target_vdu
["interfaces"] = interfaces_wthout_positions
5081 mock_prepare_vdu_cloud_init
.return_value
= {}
5082 mock_prepare_vdu_affinity_group_list
.return_value
= []
5083 persistent_root_disk
= {
5084 "persistent-root-volume": {
5085 "image_id": "ubuntu20.04",
5089 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5090 new_kwargs
= deepcopy(kwargs
)
5095 "tasks_by_target_record_id": {},
5100 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5101 db
.get_one
.return_value
= vnfd
5102 result
= Ns
._process
_vdu
_params
(
5103 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5105 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5106 mock_sort_vdu_interfaces
.assert_not_called()
5107 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5108 mock_prepare_vdu_cloud_init
.assert_called_once()
5109 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5110 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5111 mock_prepare_vdu_interfaces
.assert_called_once_with(
5113 expected_extra_dict_copy
,
5120 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5121 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5122 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5123 mock_find_persistent_volumes
.assert_not_called()
5124 mock_find_persistent_root_volumes
.assert_not_called()
5126 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5127 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5128 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5129 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5130 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5131 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5132 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5133 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5134 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5135 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5136 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5138 mock_prepare_vdu_affinity_group_list
,
5139 mock_add_persistent_ordinary_disks_to_disk_list
,
5140 mock_add_persistent_root_disk_to_disk_list
,
5141 mock_find_persistent_volumes
,
5142 mock_find_persistent_root_volumes
,
5143 mock_prepare_vdu_ssh_keys
,
5144 mock_prepare_vdu_cloud_init
,
5145 mock_prepare_vdu_interfaces
,
5146 mock_locate_vdu_interfaces
,
5147 mock_sort_vdu_interfaces
,
5149 """Prepare vdu interfaces method raises exception."""
5150 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5153 target_vdu
["interfaces"] = interfaces_wthout_positions
5154 mock_prepare_vdu_cloud_init
.return_value
= {}
5155 mock_prepare_vdu_affinity_group_list
.return_value
= []
5156 persistent_root_disk
= {
5157 "persistent-root-volume": {
5158 "image_id": "ubuntu20.04",
5162 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5163 new_kwargs
= deepcopy(kwargs
)
5168 "tasks_by_target_record_id": {},
5172 mock_prepare_vdu_interfaces
.side_effect
= TypeError
5174 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5175 db
.get_one
.return_value
= vnfd
5176 with self
.assertRaises(Exception) as err
:
5177 Ns
._process
_vdu
_params
(
5178 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5180 self
.assertEqual(type(err
), TypeError)
5181 mock_sort_vdu_interfaces
.assert_not_called()
5182 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5183 mock_prepare_vdu_cloud_init
.assert_not_called()
5184 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
5185 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5186 mock_prepare_vdu_interfaces
.assert_called_once()
5187 mock_prepare_vdu_ssh_keys
.assert_not_called()
5188 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5189 mock_find_persistent_volumes
.assert_not_called()
5190 mock_find_persistent_root_volumes
.assert_not_called()
5192 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5193 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5194 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5195 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5196 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5197 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5198 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5199 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5200 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5201 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5202 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5204 mock_prepare_vdu_affinity_group_list
,
5205 mock_add_persistent_ordinary_disks_to_disk_list
,
5206 mock_add_persistent_root_disk_to_disk_list
,
5207 mock_find_persistent_volumes
,
5208 mock_find_persistent_root_volumes
,
5209 mock_prepare_vdu_ssh_keys
,
5210 mock_prepare_vdu_cloud_init
,
5211 mock_prepare_vdu_interfaces
,
5212 mock_locate_vdu_interfaces
,
5213 mock_sort_vdu_interfaces
,
5215 """Add persistent root disk method raises exception."""
5216 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5219 target_vdu
["interfaces"] = interfaces_wthout_positions
5220 mock_prepare_vdu_cloud_init
.return_value
= {}
5221 mock_prepare_vdu_affinity_group_list
.return_value
= []
5222 mock_add_persistent_root_disk_to_disk_list
.side_effect
= KeyError
5223 new_kwargs
= deepcopy(kwargs
)
5228 "tasks_by_target_record_id": {},
5233 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5234 db
.get_one
.return_value
= vnfd
5235 with self
.assertRaises(Exception) as err
:
5236 Ns
._process
_vdu
_params
(
5237 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5239 self
.assertEqual(type(err
), KeyError)
5240 mock_sort_vdu_interfaces
.assert_not_called()
5241 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5242 mock_prepare_vdu_cloud_init
.assert_called_once()
5243 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5244 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5245 mock_prepare_vdu_interfaces
.assert_called_once_with(
5249 f
"{ns_preffix}:image.0",
5250 f
"{ns_preffix}:flavor.0",
5260 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5261 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5262 mock_find_persistent_volumes
.assert_not_called()
5263 mock_find_persistent_root_volumes
.assert_not_called()
5265 def test_select_persistent_root_disk(self
):
5266 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5267 vdu
["virtual-storage-desc"] = [
5268 "persistent-root-volume",
5269 "persistent-volume2",
5272 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5273 expected_result
= vsd
5274 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5275 self
.assertEqual(result
, expected_result
)
5277 def test_select_persistent_root_disk_first_vsd_is_different(self
):
5278 """VDU first virtual-storage-desc is different than vsd id."""
5279 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5280 vdu
["virtual-storage-desc"] = [
5281 "persistent-volume2",
5282 "persistent-root-volume",
5285 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5286 expected_result
= None
5287 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5288 self
.assertEqual(result
, expected_result
)
5290 def test_select_persistent_root_disk_vsd_is_not_persistent(self
):
5291 """vsd type is not persistent."""
5292 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5293 vdu
["virtual-storage-desc"] = [
5294 "persistent-volume2",
5295 "persistent-root-volume",
5298 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5299 vsd
["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5300 expected_result
= None
5301 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5302 self
.assertEqual(result
, expected_result
)
5304 def test_select_persistent_root_disk_vsd_does_not_have_size(self
):
5305 """vsd size is None."""
5306 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5307 vdu
["virtual-storage-desc"] = [
5308 "persistent-volume2",
5309 "persistent-root-volume",
5312 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5313 vsd
["size-of-storage"] = None
5314 expected_result
= None
5315 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5316 self
.assertEqual(result
, expected_result
)
5318 def test_select_persistent_root_disk_vdu_wthout_vsd(self
):
5319 """VDU does not have virtual-storage-desc."""
5320 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5321 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5322 expected_result
= None
5323 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5324 self
.assertEqual(result
, expected_result
)
5326 def test_select_persistent_root_disk_invalid_vsd_type(self
):
5327 """vsd is list, expected to be a dict."""
5328 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5329 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"]
5330 with self
.assertRaises(AttributeError):
5331 Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)