1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy
import deepcopy
19 from unittest
.mock
import MagicMock
, Mock
, patch
29 from osm_ng_ro
.ns
import Ns
, NsException
32 __author__
= "Eduardo Sousa"
33 __date__
= "$19-NOV-2021 00:00:00$"
36 # Variables used in Tests
37 vnfd_wth_persistent_storage
= {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
56 "virtual-storage-desc": [
57 "persistent-root-volume",
64 "virtual-storage-desc": [
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
74 "vdu-storage-requirements": [
75 {"key": "keep-volume", "value": "true"},
79 "id": "ephemeral-volume",
80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
81 "size-of-storage": "1",
87 "path": "/app/storage/",
92 vim_volume_id
= "ru937f49-3870-4169-b758-9732e1ff40f3"
93 task_by_target_record_id
= {
94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
95 "extra_dict": {"params": {"net_type": "SR-IOV"}}
98 interfaces_wthout_positions
= [
109 "ns-vld-id": "mgmtnet",
112 interfaces_wth_all_positions
= [
125 "ns-vld-id": "mgmtnet",
129 target_vdu_wth_persistent_storage
= {
130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
133 "vdu-name": "several_volumes-VM",
137 "ns-vld-id": "mgmtnet",
140 "virtual-storages": [
142 "id": "persistent-volume2",
143 "size-of-storage": "10",
144 "type-of-storage": "persistent-storage:persistent-storage",
147 "id": "persistent-root-volume",
148 "size-of-storage": "10",
149 "type-of-storage": "persistent-storage:persistent-storage",
150 "vdu-storage-requirements": [
151 {"key": "keep-volume", "value": "true"},
155 "id": "ephemeral-volume",
156 "size-of-storage": "1",
157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
161 db
= MagicMock(name
="database mock")
162 fs
= MagicMock(name
="database mock")
163 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
164 vnf_preffix
= "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
165 vnfr_id
= "wh47f48-y870-4169-b758-5732e1ff40f5"
166 nsr_id
= "th47f48-9870-4169-b758-9732e1ff40f3"
168 "name": "sample_name",
170 expected_extra_dict
= {
172 f
"{ns_preffix}:image.0",
173 f
"{ns_preffix}:flavor.0",
176 "affinity_group_list": [],
177 "availability_zone_index": None,
178 "availability_zone_list": None,
179 "cloud_config": None,
180 "description": "several_volumes-VM",
182 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
183 "image_id": f
"TASK-{ns_preffix}:image.0",
184 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
190 expected_extra_dict2
= {
192 f
"{ns_preffix}:image.0",
193 f
"{ns_preffix}:flavor.0",
196 "affinity_group_list": [],
197 "availability_zone_index": None,
198 "availability_zone_list": None,
199 "cloud_config": None,
200 "description": "without_volumes-VM",
202 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
203 "image_id": f
"TASK-{ns_preffix}:image.0",
204 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
210 tasks_by_target_record_id
= {
211 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
214 "net_type": "SR-IOV",
221 "vdu2cloud_init": {},
223 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
224 "member-vnf-index-ref": "vnf-several-volumes",
227 vnfd_wthout_persistent_storage
= {
228 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
232 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
235 "id": "without_volumes-vnf",
236 "product-name": "without_volumes-vnf",
239 "id": "without_volumes-VM",
240 "name": "without_volumes-VM",
241 "sw-image-desc": "ubuntu20.04",
242 "alternative-sw-image-desc": [
246 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
250 "virtual-storage-desc": [
251 {"id": "root-volume", "size-of-storage": "10"},
253 "id": "ephemeral-volume",
254 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
255 "size-of-storage": "1",
261 "path": "/app/storage/",
267 target_vdu_wthout_persistent_storage
= {
268 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
271 "vdu-name": "without_volumes-VM",
275 "ns-vld-id": "mgmtnet",
278 "virtual-storages": [
281 "size-of-storage": "10",
284 "id": "ephemeral-volume",
285 "size-of-storage": "1",
286 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
290 cloud_init_content
= """
295 overwrite: {{is_override}}
298 - [ sh, -xc, "echo $(date) '{{command}}'" ]
309 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
311 vdu_id
= "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
312 vnf_id
= "665b4165-ce24-4320-bf19-b9a45bade49f"
313 target_vim
= "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
314 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
315 nsr_id_2
= "993166fe-723e-4680-ac4b-b1af2541ae31"
316 target_record_1
= "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
318 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
319 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
321 expected_result_vertical_scale
= {
322 "target_id": target_vim
,
323 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
324 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
325 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
326 "status": "SCHEDULED",
328 "item": "verticalscale",
329 "target_record": target_record_1
,
330 "target_record_id": target_record_id
,
332 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
333 "flavor_dict": "flavor_dict",
338 "vim_info": {target_vim
: {"interfaces": []}},
340 vnf
= {"_id": vnf_id
}
341 extra_dict_vertical_scale
= {
343 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
344 "flavor_dict": "flavor_dict",
347 extra_dict_migrate
= {
349 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
350 "migrate_host": "migrateToHost",
353 expected_result_migrate
= {
354 "target_id": target_vim
,
355 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
356 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
357 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
358 "status": "SCHEDULED",
361 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
362 "target_record_id": target_record_id
,
364 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
365 "migrate_host": "migrateToHost",
368 expected_result_rebuild_start_stop
= {
369 "target_id": target_vim
,
370 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
371 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
372 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
373 "status": "SCHEDULED",
376 "target_record_id": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
380 class TestException(Exception):
384 class CopyingMock(MagicMock
):
385 def __call__(self
, *args
, **kwargs
):
386 args
= deepcopy(args
)
387 kwargs
= deepcopy(kwargs
)
388 return super(CopyingMock
, self
).__call
__(*args
, **kwargs
)
391 class TestNs(unittest
.TestCase
):
395 def test__create_task_without_extra_dict(self
):
397 "target_id": "vim_openstack_1",
398 "action_id": "123456",
400 "task_id": "123456:1",
401 "status": "SCHEDULED",
404 "target_record": "test_target_record",
405 "target_record_id": "test_target_record_id",
408 "action_id": "123456",
413 task
= Ns
._create
_task
(
414 deployment_info
=deployment_info
,
415 target_id
="vim_openstack_1",
418 target_record
="test_target_record",
419 target_record_id
="test_target_record_id",
422 self
.assertEqual(deployment_info
.get("task_index"), 2)
423 self
.assertDictEqual(task
, expected_result
)
425 def test__create_task(self
):
427 "target_id": "vim_openstack_1",
428 "action_id": "123456",
430 "task_id": "123456:1",
431 "status": "SCHEDULED",
434 "target_record": "test_target_record",
435 "target_record_id": "test_target_record_id",
436 # values coming from extra_dict
437 "params": "test_params",
438 "find_params": "test_find_params",
439 "depends_on": "test_depends_on",
442 "action_id": "123456",
447 task
= Ns
._create
_task
(
448 deployment_info
=deployment_info
,
449 target_id
="vim_openstack_1",
452 target_record
="test_target_record",
453 target_record_id
="test_target_record_id",
455 "params": "test_params",
456 "find_params": "test_find_params",
457 "depends_on": "test_depends_on",
461 self
.assertEqual(deployment_info
.get("task_index"), 2)
462 self
.assertDictEqual(task
, expected_result
)
464 @patch("osm_ng_ro.ns.time")
465 def test__create_ro_task(self
, mock_time
: Mock
):
466 now
= 1637324838.994551
467 mock_time
.return_value
= now
469 "target_id": "vim_openstack_1",
470 "action_id": "123456",
472 "task_id": "123456:1",
473 "status": "SCHEDULED",
476 "target_record": "test_target_record",
477 "target_record_id": "test_target_record_id",
478 # values coming from extra_dict
479 "params": "test_params",
480 "find_params": "test_find_params",
481 "depends_on": "test_depends_on",
487 "target_id": "vim_openstack_1",
490 "created_items": None,
504 ro_task
= Ns
._create
_ro
_task
(
505 target_id
="vim_openstack_1",
509 self
.assertDictEqual(ro_task
, expected_result
)
511 def test__process_image_params_with_empty_target_image(self
):
517 result
= Ns
._process
_image
_params
(
518 target_image
=target_image
,
521 target_record_id
=None,
524 self
.assertDictEqual(expected_result
, result
)
526 def test__process_image_params_with_wrong_target_image(self
):
531 "no_image": "to_see_here",
534 result
= Ns
._process
_image
_params
(
535 target_image
=target_image
,
538 target_record_id
=None,
541 self
.assertDictEqual(expected_result
, result
)
543 def test__process_image_params_with_image(self
):
555 result
= Ns
._process
_image
_params
(
556 target_image
=target_image
,
559 target_record_id
=None,
562 self
.assertDictEqual(expected_result
, result
)
564 def test__process_image_params_with_vim_image_id(self
):
573 "vim_image_id": "123456",
576 result
= Ns
._process
_image
_params
(
577 target_image
=target_image
,
580 target_record_id
=None,
583 self
.assertDictEqual(expected_result
, result
)
585 def test__process_image_params_with_image_checksum(self
):
589 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
594 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
597 result
= Ns
._process
_image
_params
(
598 target_image
=target_image
,
601 target_record_id
=None,
604 self
.assertDictEqual(expected_result
, result
)
606 def test__get_resource_allocation_params_with_empty_target_image(self
):
608 quota_descriptor
= {}
610 result
= Ns
._get
_resource
_allocation
_params
(
611 quota_descriptor
=quota_descriptor
,
614 self
.assertDictEqual(expected_result
, result
)
616 def test__get_resource_allocation_params_with_wrong_target_image(self
):
619 "no_quota": "present_here",
622 result
= Ns
._get
_resource
_allocation
_params
(
623 quota_descriptor
=quota_descriptor
,
626 self
.assertDictEqual(expected_result
, result
)
628 def test__get_resource_allocation_params_with_limit(self
):
636 result
= Ns
._get
_resource
_allocation
_params
(
637 quota_descriptor
=quota_descriptor
,
640 self
.assertDictEqual(expected_result
, result
)
642 def test__get_resource_allocation_params_with_reserve(self
):
650 result
= Ns
._get
_resource
_allocation
_params
(
651 quota_descriptor
=quota_descriptor
,
654 self
.assertDictEqual(expected_result
, result
)
656 def test__get_resource_allocation_params_with_shares(self
):
664 result
= Ns
._get
_resource
_allocation
_params
(
665 quota_descriptor
=quota_descriptor
,
668 self
.assertDictEqual(expected_result
, result
)
670 def test__get_resource_allocation_params(self
):
682 result
= Ns
._get
_resource
_allocation
_params
(
683 quota_descriptor
=quota_descriptor
,
686 self
.assertDictEqual(expected_result
, result
)
688 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
689 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
697 result
= Ns
._process
_guest
_epa
_quota
_params
(
698 guest_epa_quota
=guest_epa_quota
,
699 epa_vcpu_set
=epa_vcpu_set
,
702 self
.assertDictEqual(expected_result
, result
)
703 self
.assertFalse(resource_allocation
.called
)
705 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
706 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
714 result
= Ns
._process
_guest
_epa
_quota
_params
(
715 guest_epa_quota
=guest_epa_quota
,
716 epa_vcpu_set
=epa_vcpu_set
,
719 self
.assertDictEqual(expected_result
, result
)
720 self
.assertFalse(resource_allocation
.called
)
722 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
723 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
729 "no-quota": "nothing",
733 result
= Ns
._process
_guest
_epa
_quota
_params
(
734 guest_epa_quota
=guest_epa_quota
,
735 epa_vcpu_set
=epa_vcpu_set
,
738 self
.assertDictEqual(expected_result
, result
)
739 self
.assertFalse(resource_allocation
.called
)
741 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
742 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
748 "no-quota": "nothing",
752 result
= Ns
._process
_guest
_epa
_quota
_params
(
753 guest_epa_quota
=guest_epa_quota
,
754 epa_vcpu_set
=epa_vcpu_set
,
757 self
.assertDictEqual(expected_result
, result
)
758 self
.assertFalse(resource_allocation
.called
)
760 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
761 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
775 result
= Ns
._process
_guest
_epa
_quota
_params
(
776 guest_epa_quota
=guest_epa_quota
,
777 epa_vcpu_set
=epa_vcpu_set
,
780 self
.assertDictEqual(expected_result
, result
)
781 self
.assertFalse(resource_allocation
.called
)
783 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
784 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
804 resource_allocation_param
= {
809 resource_allocation
.return_value
= {
815 result
= Ns
._process
_guest
_epa
_quota
_params
(
816 guest_epa_quota
=guest_epa_quota
,
817 epa_vcpu_set
=epa_vcpu_set
,
820 resource_allocation
.assert_called_once_with(resource_allocation_param
)
821 self
.assertDictEqual(expected_result
, result
)
823 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
824 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
844 resource_allocation_param
= {
849 resource_allocation
.return_value
= {
855 result
= Ns
._process
_guest
_epa
_quota
_params
(
856 guest_epa_quota
=guest_epa_quota
,
857 epa_vcpu_set
=epa_vcpu_set
,
860 resource_allocation
.assert_called_once_with(resource_allocation_param
)
861 self
.assertDictEqual(expected_result
, result
)
863 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
864 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
884 resource_allocation_param
= {
889 resource_allocation
.return_value
= {
895 result
= Ns
._process
_guest
_epa
_quota
_params
(
896 guest_epa_quota
=guest_epa_quota
,
897 epa_vcpu_set
=epa_vcpu_set
,
900 resource_allocation
.assert_called_once_with(resource_allocation_param
)
901 self
.assertDictEqual(expected_result
, result
)
903 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
904 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
924 resource_allocation_param
= {
929 resource_allocation
.return_value
= {
935 result
= Ns
._process
_guest
_epa
_quota
_params
(
936 guest_epa_quota
=guest_epa_quota
,
937 epa_vcpu_set
=epa_vcpu_set
,
940 resource_allocation
.assert_called_once_with(resource_allocation_param
)
941 self
.assertDictEqual(expected_result
, result
)
943 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
944 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
964 resource_allocation_param
= {
969 resource_allocation
.return_value
= {
975 result
= Ns
._process
_guest
_epa
_quota
_params
(
976 guest_epa_quota
=guest_epa_quota
,
977 epa_vcpu_set
=epa_vcpu_set
,
980 resource_allocation
.assert_called_once_with(resource_allocation_param
)
981 self
.assertDictEqual(expected_result
, result
)
983 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
984 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
1004 resource_allocation_param
= {
1009 resource_allocation
.return_value
= {
1015 result
= Ns
._process
_guest
_epa
_quota
_params
(
1016 guest_epa_quota
=guest_epa_quota
,
1017 epa_vcpu_set
=epa_vcpu_set
,
1020 resource_allocation
.assert_called_once_with(resource_allocation_param
)
1021 self
.assertDictEqual(expected_result
, result
)
1023 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1024 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
1026 resource_allocation
,
1042 epa_vcpu_set
= False
1044 resource_allocation_param
= {
1049 resource_allocation
.return_value
= {
1055 result
= Ns
._process
_guest
_epa
_quota
_params
(
1056 guest_epa_quota
=guest_epa_quota
,
1057 epa_vcpu_set
=epa_vcpu_set
,
1060 resource_allocation
.assert_called_once_with(resource_allocation_param
)
1061 self
.assertDictEqual(expected_result
, result
)
1063 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1064 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
1066 resource_allocation
,
1109 resource_allocation
.return_value
= {
1115 result
= Ns
._process
_guest
_epa
_quota
_params
(
1116 guest_epa_quota
=guest_epa_quota
,
1117 epa_vcpu_set
=epa_vcpu_set
,
1120 self
.assertTrue(resource_allocation
.called
)
1121 self
.assertDictEqual(expected_result
, result
)
1123 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1124 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1126 resource_allocation
,
1172 epa_vcpu_set
= False
1174 resource_allocation
.return_value
= {
1180 result
= Ns
._process
_guest
_epa
_quota
_params
(
1181 guest_epa_quota
=guest_epa_quota
,
1182 epa_vcpu_set
=epa_vcpu_set
,
1185 self
.assertTrue(resource_allocation
.called
)
1186 self
.assertDictEqual(expected_result
, result
)
1188 def test__process_guest_epa_numa_params_with_empty_numa_params(self
):
1189 expected_numa_result
= []
1190 expected_epa_vcpu_set_result
= False
1191 guest_epa_quota
= {}
1193 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1194 guest_epa_quota
=guest_epa_quota
,
1196 self
.assertEqual(expected_numa_result
, numa_result
)
1197 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1199 def test__process_guest_epa_numa_params_with_wrong_numa_params(self
):
1200 expected_numa_result
= []
1201 expected_epa_vcpu_set_result
= False
1202 guest_epa_quota
= {"no_nume": "here"}
1204 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1205 guest_epa_quota
=guest_epa_quota
,
1208 self
.assertEqual(expected_numa_result
, numa_result
)
1209 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1211 def test__process_guest_epa_numa_params_with_numa_node_policy(self
):
1212 expected_numa_result
= []
1213 expected_epa_vcpu_set_result
= False
1214 guest_epa_quota
= {"numa-node-policy": {}}
1216 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1217 guest_epa_quota
=guest_epa_quota
,
1220 self
.assertEqual(expected_numa_result
, numa_result
)
1221 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1223 def test__process_guest_epa_numa_params_with_no_node(self
):
1224 expected_numa_result
= []
1225 expected_epa_vcpu_set_result
= False
1227 "numa-node-policy": {
1232 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1233 guest_epa_quota
=guest_epa_quota
,
1236 self
.assertEqual(expected_numa_result
, numa_result
)
1237 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1239 def test__process_guest_epa_numa_params_with_1_node_num_cores(self
):
1240 expected_numa_result
= [{"cores": 3}]
1241 expected_epa_vcpu_set_result
= True
1243 "numa-node-policy": {
1252 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1253 guest_epa_quota
=guest_epa_quota
,
1256 self
.assertEqual(expected_numa_result
, numa_result
)
1257 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1259 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self
):
1260 expected_numa_result
= [{"paired_threads": 3}]
1261 expected_epa_vcpu_set_result
= True
1263 "numa-node-policy": {
1266 "paired-threads": {"num-paired-threads": "3"},
1272 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1273 guest_epa_quota
=guest_epa_quota
,
1276 self
.assertEqual(expected_numa_result
, numa_result
)
1277 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1279 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self
):
1280 expected_numa_result
= [
1282 "paired-threads-id": [("0", "1"), ("4", "5")],
1285 expected_epa_vcpu_set_result
= False
1287 "numa-node-policy": {
1291 "paired-thread-ids": [
1307 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1308 guest_epa_quota
=guest_epa_quota
,
1311 self
.assertEqual(expected_numa_result
, numa_result
)
1312 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1314 def test__process_guest_epa_numa_params_with_1_node_num_threads(self
):
1315 expected_numa_result
= [{"threads": 3}]
1316 expected_epa_vcpu_set_result
= True
1318 "numa-node-policy": {
1327 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1328 guest_epa_quota
=guest_epa_quota
,
1331 self
.assertEqual(expected_numa_result
, numa_result
)
1332 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1334 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self
):
1335 expected_numa_result
= [{"memory": 2}]
1336 expected_epa_vcpu_set_result
= False
1338 "numa-node-policy": {
1347 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1348 guest_epa_quota
=guest_epa_quota
,
1351 self
.assertEqual(expected_numa_result
, numa_result
)
1352 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1354 def test__process_guest_epa_numa_params_with_1_node_vcpu(self
):
1355 expected_numa_result
= [
1361 expected_epa_vcpu_set_result
= False
1363 "numa-node-policy": {
1364 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1368 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1369 guest_epa_quota
=guest_epa_quota
,
1372 self
.assertEqual(expected_numa_result
, numa_result
)
1373 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1375 def test__process_guest_epa_numa_params_with_2_node_vcpu(self
):
1376 expected_numa_result
= [
1387 expected_epa_vcpu_set_result
= False
1389 "numa-node-policy": {
1391 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1392 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1397 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1398 guest_epa_quota
=guest_epa_quota
,
1401 self
.assertEqual(expected_numa_result
, numa_result
)
1402 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1404 def test__process_guest_epa_numa_params_with_1_node(self
):
1405 expected_numa_result
= [
1410 "paired_threads": 3,
1411 "paired-threads-id": [("0", "1"), ("4", "5")],
1416 expected_epa_vcpu_set_result
= True
1418 "numa-node-policy": {
1423 "num-paired-threads": "3",
1424 "paired-thread-ids": [
1442 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1443 guest_epa_quota
=guest_epa_quota
,
1446 self
.assertEqual(expected_numa_result
, numa_result
)
1447 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1449 def test__process_guest_epa_numa_params_with_2_nodes(self
):
1450 expected_numa_result
= [
1453 "paired_threads": 3,
1454 "paired-threads-id": [("0", "1"), ("4", "5")],
1460 "paired_threads": 7,
1461 "paired-threads-id": [("2", "3"), ("5", "6")],
1466 expected_epa_vcpu_set_result
= True
1468 "numa-node-policy": {
1473 "num-paired-threads": "3",
1474 "paired-thread-ids": [
1491 "num-paired-threads": "7",
1492 "paired-thread-ids": [
1510 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1511 guest_epa_quota
=guest_epa_quota
,
1514 self
.assertEqual(expected_numa_result
, numa_result
)
1515 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1517 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self
):
1518 expected_numa_result
= {}
1519 expected_epa_vcpu_set_result
= False
1520 guest_epa_quota
= {}
1522 epa_vcpu_set
= False
1524 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1525 guest_epa_quota
=guest_epa_quota
,
1526 vcpu_count
=vcpu_count
,
1527 epa_vcpu_set
=epa_vcpu_set
,
1530 self
.assertDictEqual(expected_numa_result
, numa_result
)
1531 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1533 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self
):
1534 expected_numa_result
= {}
1535 expected_epa_vcpu_set_result
= False
1537 "no-cpu-pinning-policy": "here",
1540 epa_vcpu_set
= False
1542 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1543 guest_epa_quota
=guest_epa_quota
,
1544 vcpu_count
=vcpu_count
,
1545 epa_vcpu_set
=epa_vcpu_set
,
1548 self
.assertDictEqual(expected_numa_result
, numa_result
)
1549 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1551 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self
):
1552 expected_numa_result
= {}
1553 expected_epa_vcpu_set_result
= True
1555 "cpu-pinning-policy": "DEDICATED",
1560 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1561 guest_epa_quota
=guest_epa_quota
,
1562 vcpu_count
=vcpu_count
,
1563 epa_vcpu_set
=epa_vcpu_set
,
1566 self
.assertDictEqual(expected_numa_result
, numa_result
)
1567 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1569 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self
):
1570 expected_numa_result
= {"threads": 3}
1571 expected_epa_vcpu_set_result
= True
1573 "cpu-pinning-policy": "DEDICATED",
1574 "cpu-thread-pinning-policy": "PREFER",
1577 epa_vcpu_set
= False
1579 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1580 guest_epa_quota
=guest_epa_quota
,
1581 vcpu_count
=vcpu_count
,
1582 epa_vcpu_set
=epa_vcpu_set
,
1585 self
.assertDictEqual(expected_numa_result
, numa_result
)
1586 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1588 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self
):
1589 expected_numa_result
= {"cores": 3}
1590 expected_epa_vcpu_set_result
= True
1592 "cpu-pinning-policy": "DEDICATED",
1593 "cpu-thread-pinning-policy": "ISOLATE",
1596 epa_vcpu_set
= False
1598 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1599 guest_epa_quota
=guest_epa_quota
,
1600 vcpu_count
=vcpu_count
,
1601 epa_vcpu_set
=epa_vcpu_set
,
1604 self
.assertDictEqual(expected_numa_result
, numa_result
)
1605 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1607 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self
):
1608 expected_numa_result
= {"threads": 3}
1609 expected_epa_vcpu_set_result
= True
1611 "cpu-pinning-policy": "DEDICATED",
1612 "cpu-thread-pinning-policy": "REQUIRE",
1615 epa_vcpu_set
= False
1617 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1618 guest_epa_quota
=guest_epa_quota
,
1619 vcpu_count
=vcpu_count
,
1620 epa_vcpu_set
=epa_vcpu_set
,
1623 self
.assertDictEqual(expected_numa_result
, numa_result
)
1624 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1626 def test__process_guest_epa_cpu_pinning_params(self
):
1627 expected_numa_result
= {"threads": 3}
1628 expected_epa_vcpu_set_result
= True
1630 "cpu-pinning-policy": "DEDICATED",
1633 epa_vcpu_set
= False
1635 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1636 guest_epa_quota
=guest_epa_quota
,
1637 vcpu_count
=vcpu_count
,
1638 epa_vcpu_set
=epa_vcpu_set
,
1641 self
.assertDictEqual(expected_numa_result
, numa_result
)
1642 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1644 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1645 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1646 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1647 def test__process_guest_epa_params_with_empty_params(
1649 guest_epa_numa_params
,
1650 guest_epa_cpu_pinning_params
,
1651 guest_epa_quota_params
,
1653 expected_result
= {}
1656 result
= Ns
._process
_epa
_params
(
1657 target_flavor
=target_flavor
,
1660 self
.assertDictEqual(expected_result
, result
)
1661 self
.assertFalse(guest_epa_numa_params
.called
)
1662 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1663 self
.assertFalse(guest_epa_quota_params
.called
)
1665 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1666 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1667 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1668 def test__process_guest_epa_params_with_wrong_params(
1670 guest_epa_numa_params
,
1671 guest_epa_cpu_pinning_params
,
1672 guest_epa_quota_params
,
1674 expected_result
= {}
1676 "no-guest-epa": "here",
1679 result
= Ns
._process
_epa
_params
(
1680 target_flavor
=target_flavor
,
1683 self
.assertDictEqual(expected_result
, result
)
1684 self
.assertFalse(guest_epa_numa_params
.called
)
1685 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1686 self
.assertFalse(guest_epa_quota_params
.called
)
1688 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1689 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1690 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1691 def test__process_guest_epa_params(
1693 guest_epa_numa_params
,
1694 guest_epa_cpu_pinning_params
,
1695 guest_epa_quota_params
,
1698 "mem-policy": "STRICT",
1703 "numa-node-policy": {
1704 "mem-policy": "STRICT",
1709 guest_epa_numa_params
.return_value
= ({}, False)
1710 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1711 guest_epa_quota_params
.return_value
= {}
1713 result
= Ns
._process
_epa
_params
(
1714 target_flavor
=target_flavor
,
1717 self
.assertDictEqual(expected_result
, result
)
1718 self
.assertTrue(guest_epa_numa_params
.called
)
1719 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1720 self
.assertTrue(guest_epa_quota_params
.called
)
1722 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1723 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1724 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1725 def test__process_guest_epa_params_with_mempage_size(
1727 guest_epa_numa_params
,
1728 guest_epa_cpu_pinning_params
,
1729 guest_epa_quota_params
,
1732 "mempage-size": "1G",
1733 "mem-policy": "STRICT",
1738 "mempage-size": "1G",
1739 "numa-node-policy": {
1740 "mem-policy": "STRICT",
1745 guest_epa_numa_params
.return_value
= ({}, False)
1746 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1747 guest_epa_quota_params
.return_value
= {}
1749 result
= Ns
._process
_epa
_params
(
1750 target_flavor
=target_flavor
,
1753 self
.assertDictEqual(expected_result
, result
)
1754 self
.assertTrue(guest_epa_numa_params
.called
)
1755 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1756 self
.assertTrue(guest_epa_quota_params
.called
)
1758 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1759 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1760 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1761 def test__process_guest_epa_params_with_numa(
1763 guest_epa_numa_params
,
1764 guest_epa_cpu_pinning_params
,
1765 guest_epa_quota_params
,
1768 "mempage-size": "1G",
1769 "cpu-pinning-policy": "DEDICATED",
1770 "cpu-thread-pinning-policy": "PREFER",
1775 "paired-threads": 3,
1776 "paired-threads-id": [("0", "1"), ("4", "5")],
1780 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1781 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1782 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1783 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1788 "mempage-size": "1G",
1789 "cpu-pinning-policy": "DEDICATED",
1790 "cpu-thread-pinning-policy": "PREFER",
1791 "numa-node-policy": {
1796 "num-paired-threads": "3",
1797 "paired-thread-ids": [
1836 guest_epa_numa_params
.return_value
= (
1840 "paired-threads": 3,
1841 "paired-threads-id": [("0", "1"), ("4", "5")],
1848 guest_epa_cpu_pinning_params
.return_value
= (
1854 guest_epa_quota_params
.return_value
= {
1877 result
= Ns
._process
_epa
_params
(
1878 target_flavor
=target_flavor
,
1880 self
.assertEqual(expected_result
, result
)
1881 self
.assertTrue(guest_epa_numa_params
.called
)
1882 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1883 self
.assertTrue(guest_epa_quota_params
.called
)
1885 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1886 def test__process_flavor_params_with_empty_target_flavor(
1894 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1899 target_record_id
= ""
1901 with self
.assertRaises(KeyError):
1902 Ns
._process
_flavor
_params
(
1903 target_flavor
=target_flavor
,
1906 target_record_id
=target_record_id
,
1909 self
.assertFalse(epa_params
.called
)
1911 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1912 def test__process_flavor_params_with_wrong_target_flavor(
1917 "no-target-flavor": "here",
1921 target_record_id
= ""
1923 with self
.assertRaises(KeyError):
1924 Ns
._process
_flavor
_params
(
1925 target_flavor
=target_flavor
,
1928 target_record_id
=target_record_id
,
1931 self
.assertFalse(epa_params
.called
)
1933 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1934 def test__process_flavor_params_with_empty_indata(
1958 "memory-mb": "1024",
1963 target_record_id
= ""
1965 epa_params
.return_value
= {}
1967 result
= Ns
._process
_flavor
_params
(
1968 target_flavor
=target_flavor
,
1971 target_record_id
=target_record_id
,
1974 self
.assertTrue(epa_params
.called
)
1975 self
.assertDictEqual(result
, expected_result
)
1977 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1978 def test__process_flavor_params_with_wrong_indata(
2002 "memory-mb": "1024",
2009 target_record_id
= ""
2011 epa_params
.return_value
= {}
2013 result
= Ns
._process
_flavor
_params
(
2014 target_flavor
=target_flavor
,
2017 target_record_id
=target_record_id
,
2020 self
.assertTrue(epa_params
.called
)
2021 self
.assertDictEqual(result
, expected_result
)
2023 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2024 def test__process_flavor_params_with_ephemeral_disk(
2032 db
.get_one
.return_value
= {
2033 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2038 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2042 "id": "without_volumes-vnf",
2043 "product-name": "without_volumes-vnf",
2046 "id": "without_volumes-VM",
2047 "name": "without_volumes-VM",
2048 "sw-image-desc": "ubuntu20.04",
2049 "alternative-sw-image-desc": [
2051 "ubuntu20.04-azure",
2053 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2057 "virtual-storage-desc": [
2058 {"id": "root-volume", "size-of-storage": "10"},
2060 "id": "ephemeral-volume",
2061 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2062 "size-of-storage": "1",
2068 "path": "/app/storage/",
2096 "memory-mb": "1024",
2104 "ns-flavor-id": "test_id",
2105 "virtual-storages": [
2107 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2108 "size-of-storage": "10",
2113 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2118 target_record_id
= ""
2120 epa_params
.return_value
= {}
2122 result
= Ns
._process
_flavor
_params
(
2123 target_flavor
=target_flavor
,
2126 target_record_id
=target_record_id
,
2130 self
.assertTrue(epa_params
.called
)
2131 self
.assertDictEqual(result
, expected_result
)
2133 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2134 def test__process_flavor_params_with_swap_disk(
2161 "memory-mb": "1024",
2169 "ns-flavor-id": "test_id",
2170 "virtual-storages": [
2172 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2173 "size-of-storage": "20",
2178 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2183 target_record_id
= ""
2185 epa_params
.return_value
= {}
2187 result
= Ns
._process
_flavor
_params
(
2188 target_flavor
=target_flavor
,
2191 target_record_id
=target_record_id
,
2194 self
.assertTrue(epa_params
.called
)
2195 self
.assertDictEqual(result
, expected_result
)
2197 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2198 def test__process_flavor_params_with_persistent_root_disk(
2206 db
.get_one
.return_value
= {
2207 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2212 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2216 "id": "several_volumes-vnf",
2217 "product-name": "several_volumes-vnf",
2220 "id": "several_volumes-VM",
2221 "name": "several_volumes-VM",
2222 "sw-image-desc": "ubuntu20.04",
2223 "alternative-sw-image-desc": [
2225 "ubuntu20.04-azure",
2227 "virtual-storage-desc": [
2228 "persistent-root-volume",
2233 "virtual-storage-desc": [
2235 "id": "persistent-root-volume",
2236 "type-of-storage": "persistent-storage:persistent-storage",
2237 "size-of-storage": "10",
2243 "path": "/app/storage/",
2269 "memory-mb": "1024",
2277 "vdu-name": "several_volumes-VM",
2278 "ns-flavor-id": "test_id",
2279 "virtual-storages": [
2281 "type-of-storage": "persistent-storage:persistent-storage",
2282 "size-of-storage": "10",
2287 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2292 target_record_id
= ""
2294 epa_params
.return_value
= {}
2296 result
= Ns
._process
_flavor
_params
(
2297 target_flavor
=target_flavor
,
2300 target_record_id
=target_record_id
,
2304 self
.assertTrue(epa_params
.called
)
2305 self
.assertDictEqual(result
, expected_result
)
2307 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2308 def test__process_flavor_params_with_epa_params(
2319 "numa": "there-is-numa-here",
2330 "numa": "there-is-numa-here",
2339 "memory-mb": "1024",
2347 "ns-flavor-id": "test_id",
2350 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2355 target_record_id
= ""
2357 epa_params
.return_value
= {
2358 "numa": "there-is-numa-here",
2361 result
= Ns
._process
_flavor
_params
(
2362 target_flavor
=target_flavor
,
2365 target_record_id
=target_record_id
,
2368 self
.assertTrue(epa_params
.called
)
2369 self
.assertDictEqual(result
, expected_result
)
2371 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2372 def test__process_flavor_params_with_vim_flavor_id(
2378 "vim_flavor_id": "test.flavor",
2385 "memory-mb": "1024",
2393 "ns-flavor-id": "test_id",
2394 "additionalParams": {
2395 "OSM": {"vim_flavor_id": "test.flavor"}
2399 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2404 target_record_id
= ""
2406 epa_params
.return_value
= {}
2408 result
= Ns
._process
_flavor
_params
(
2409 target_flavor
=target_flavor
,
2412 target_record_id
=target_record_id
,
2415 self
.assertFalse(epa_params
.called
)
2416 self
.assertDictEqual(result
, expected_result
)
2418 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2419 def test__process_flavor_params(
2427 db
.get_one
.return_value
= {
2428 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2433 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2437 "id": "without_volumes-vnf",
2438 "product-name": "without_volumes-vnf",
2441 "id": "without_volumes-VM",
2442 "name": "without_volumes-VM",
2443 "sw-image-desc": "ubuntu20.04",
2444 "alternative-sw-image-desc": [
2446 "ubuntu20.04-azure",
2448 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2452 "virtual-storage-desc": [
2453 {"id": "root-volume", "size-of-storage": "10"},
2455 "id": "ephemeral-volume",
2456 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2457 "size-of-storage": "1",
2463 "path": "/app/storage/",
2478 "numa": "there-is-numa-here",
2491 "numa": "there-is-numa-here",
2500 "memory-mb": "1024",
2508 "ns-flavor-id": "test_id",
2509 "virtual-storages": [
2511 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2512 "size-of-storage": "10",
2515 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2516 "size-of-storage": "20",
2521 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2526 target_record_id
= ""
2528 epa_params
.return_value
= {
2529 "numa": "there-is-numa-here",
2532 result
= Ns
._process
_flavor
_params
(
2533 target_flavor
=target_flavor
,
2536 target_record_id
=target_record_id
,
2540 self
.assertTrue(epa_params
.called
)
2541 self
.assertDictEqual(result
, expected_result
)
2543 def test__process_net_params_with_empty_params(
2553 "provider_network": "some-profile-here",
2555 "some_ip_profile": "here",
2558 target_record_id
= ""
2561 "net_name": "ns-name-vld-name",
2562 "net_type": "bridge",
2564 "some_ip_profile": "here",
2566 "provider_network_profile": "some-profile-here",
2570 result
= Ns
._process
_net
_params
(
2571 target_vld
=target_vld
,
2574 target_record_id
=target_record_id
,
2577 self
.assertDictEqual(expected_result
, result
)
2579 def test__process_net_params_with_vim_info_sdn(
2590 "sdn-ports": ["some", "ports", "here"],
2591 "vlds": ["some", "vlds", "here"],
2594 target_record_id
= "vld.sdn.something"
2597 "sdn-ports": ["some", "ports", "here"],
2598 "vlds": ["some", "vlds", "here"],
2603 result
= Ns
._process
_net
_params
(
2604 target_vld
=target_vld
,
2607 target_record_id
=target_record_id
,
2610 self
.assertDictEqual(expected_result
, result
)
2612 def test__process_net_params_with_vim_info_sdn_target_vim(
2623 "sdn-ports": ["some", "ports", "here"],
2624 "vlds": ["some", "vlds", "here"],
2625 "target_vim": "some-vim",
2628 target_record_id
= "vld.sdn.something"
2630 "depends_on": ["some-vim vld.sdn"],
2632 "sdn-ports": ["some", "ports", "here"],
2633 "vlds": ["some", "vlds", "here"],
2634 "target_vim": "some-vim",
2639 result
= Ns
._process
_net
_params
(
2640 target_vld
=target_vld
,
2643 target_record_id
=target_record_id
,
2646 self
.assertDictEqual(expected_result
, result
)
2648 def test__process_net_params_with_vim_network_name(
2658 "vim_network_name": "some-network-name",
2660 target_record_id
= "vld.sdn.something"
2664 "name": "some-network-name",
2669 result
= Ns
._process
_net
_params
(
2670 target_vld
=target_vld
,
2673 target_record_id
=target_record_id
,
2676 self
.assertDictEqual(expected_result
, result
)
2678 def test__process_net_params_with_vim_network_id(
2688 "vim_network_id": "some-network-id",
2690 target_record_id
= "vld.sdn.something"
2694 "id": "some-network-id",
2699 result
= Ns
._process
_net
_params
(
2700 target_vld
=target_vld
,
2703 target_record_id
=target_record_id
,
2706 self
.assertDictEqual(expected_result
, result
)
2708 def test__process_net_params_with_mgmt_network(
2714 "mgmt-network": "some-mgmt-network",
2720 target_record_id
= "vld.sdn.something"
2728 result
= Ns
._process
_net
_params
(
2729 target_vld
=target_vld
,
2732 target_record_id
=target_record_id
,
2735 self
.assertDictEqual(expected_result
, result
)
2737 def test__process_net_params_with_underlay_eline(
2742 "underlay": "some-underlay-here",
2749 "provider_network": "some-profile-here",
2751 "some_ip_profile": "here",
2754 target_record_id
= ""
2758 "some_ip_profile": "here",
2760 "net_name": "ns-name-vld-name",
2762 "provider_network_profile": "some-profile-here",
2766 result
= Ns
._process
_net
_params
(
2767 target_vld
=target_vld
,
2770 target_record_id
=target_record_id
,
2773 self
.assertDictEqual(expected_result
, result
)
2775 def test__process_net_params_with_underlay_elan(
2780 "underlay": "some-underlay-here",
2787 "provider_network": "some-profile-here",
2789 "some_ip_profile": "here",
2792 target_record_id
= ""
2796 "some_ip_profile": "here",
2798 "net_name": "ns-name-vld-name",
2800 "provider_network_profile": "some-profile-here",
2804 result
= Ns
._process
_net
_params
(
2805 target_vld
=target_vld
,
2808 target_record_id
=target_record_id
,
2811 self
.assertDictEqual(expected_result
, result
)
2813 def test__get_cloud_init_exception(self
):
2814 db_mock
= MagicMock(name
="database mock")
2819 with self
.assertRaises(NsException
):
2820 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2822 def test__get_cloud_init_file_fs_exception(self
):
2823 db_mock
= MagicMock(name
="database mock")
2826 location
= "vnfr_id_123456:file:test_file"
2827 db_mock
.get_one
.return_value
= {
2830 "folder": "/home/osm",
2831 "pkg-dir": "vnfr_test_dir",
2836 with self
.assertRaises(NsException
):
2837 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2839 def test__get_cloud_init_file(self
):
2840 db_mock
= MagicMock(name
="database mock")
2841 fs_mock
= MagicMock(name
="filesystem mock")
2842 file_mock
= MagicMock(name
="file mock")
2844 location
= "vnfr_id_123456:file:test_file"
2845 cloud_init_content
= "this is a cloud init file content"
2847 db_mock
.get_one
.return_value
= {
2850 "folder": "/home/osm",
2851 "pkg-dir": "vnfr_test_dir",
2855 fs_mock
.file_open
.return_value
= file_mock
2856 file_mock
.__enter
__.return_value
.read
.return_value
= cloud_init_content
2858 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2860 self
.assertEqual(cloud_init_content
, result
)
2862 def test__get_cloud_init_vdu(self
):
2863 db_mock
= MagicMock(name
="database mock")
2866 location
= "vnfr_id_123456:vdu:0"
2867 cloud_init_content
= "this is a cloud init file content"
2869 db_mock
.get_one
.return_value
= {
2872 "cloud-init": cloud_init_content
,
2877 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2879 self
.assertEqual(cloud_init_content
, result
)
2881 @patch("jinja2.Environment.__init__")
2882 def test__parse_jinja2_undefined_error(self
, env_mock
: Mock
):
2883 cloud_init_content
= None
2887 env_mock
.side_effect
= UndefinedError("UndefinedError occurred.")
2889 with self
.assertRaises(NsException
):
2891 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2894 @patch("jinja2.Environment.__init__")
2895 def test__parse_jinja2_template_error(self
, env_mock
: Mock
):
2896 cloud_init_content
= None
2900 env_mock
.side_effect
= TemplateError("TemplateError occurred.")
2902 with self
.assertRaises(NsException
):
2904 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2907 @patch("jinja2.Environment.__init__")
2908 def test__parse_jinja2_template_not_found(self
, env_mock
: Mock
):
2909 cloud_init_content
= None
2913 env_mock
.side_effect
= TemplateNotFound("TemplateNotFound occurred.")
2915 with self
.assertRaises(NsException
):
2917 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2920 def test_rendering_jinja2_temp_without_special_characters(self
):
2921 cloud_init_content
= """
2924 table_type: {{type}}
2926 overwrite: {{is_override}}
2929 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2933 "is_override": "False",
2934 "command": "; mkdir abc",
2936 context
= "cloud-init for VM"
2937 expected_result
= """
2945 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
2947 result
= Ns
._parse
_jinja
2(
2948 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2950 self
.assertEqual(result
, expected_result
)
2952 def test_rendering_jinja2_temp_with_special_characters(self
):
2953 cloud_init_content
= """
2956 table_type: {{type}}
2958 overwrite: {{is_override}}
2961 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2965 "is_override": "False",
2966 "command": "& rm -rf",
2968 context
= "cloud-init for VM"
2969 expected_result
= """
2977 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
2979 result
= Ns
._parse
_jinja
2(
2980 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2982 self
.assertNotEqual(result
, expected_result
)
2984 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self
):
2985 with
patch("osm_ng_ro.ns.Environment") as mock_environment
:
2986 mock_environment
.return_value
= Environment(
2987 undefined
=StrictUndefined
,
2988 autoescape
=select_autoescape(default_for_string
=False, default
=False),
2990 cloud_init_content
= """
2993 table_type: {{type}}
2995 overwrite: {{is_override}}
2998 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3002 "is_override": "False",
3003 "command": "& rm -rf /",
3005 context
= "cloud-init for VM"
3006 expected_result
= """
3014 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3016 result
= Ns
._parse
_jinja
2(
3017 cloud_init_content
=cloud_init_content
,
3021 self
.assertEqual(result
, expected_result
)
3023 @patch("osm_ng_ro.ns.Ns._assign_vim")
3024 def test__rebuild_start_stop_task__successful(self
, assign_vim
):
3027 actions
= ["start", "stop", "rebuild"]
3030 for action
in actions
:
3032 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3035 extra_dict
["params"] = params
3036 expected_result
= deepcopy(expected_result_rebuild_start_stop
)
3039 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3040 expected_result
["params"] = params
3041 task
= self
.ns
.rebuild_start_stop_task(
3051 self
.assertDictEqual(task
, expected_result
)
3053 @patch("osm_ng_ro.ns.Ns._assign_vim")
3054 def test__rebuild_start_stop_task__empty_extra_dict__task_without_params(
3059 actions
= ["start", "stop", "rebuild"]
3062 expected_result
= deepcopy(expected_result_rebuild_start_stop
)
3065 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3067 task
= self
.ns
.rebuild_start_stop_task(
3077 self
.assertDictEqual(task
, expected_result
)
3079 @patch("osm_ng_ro.ns.Ns._assign_vim")
3080 def test__rebuild_start_stop_task__different_vdu_index__target_record_changes(
3085 actions
= ["start", "stop", "rebuild"]
3088 for action
in actions
:
3090 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3093 extra_dict
["params"] = params
3094 expected_result
= deepcopy(expected_result_rebuild_start_stop
)
3097 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.4.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3098 expected_result
["params"] = params
3099 task
= self
.ns
.rebuild_start_stop_task(
3109 self
.assertDictEqual(task
, expected_result
)
3111 @patch("osm_ng_ro.ns.Ns._assign_vim")
3112 def test__rebuild_start_stop_task__different_task_index__task_id_changes(
3117 actions
= ["start", "stop", "rebuild"]
3120 for action
in actions
:
3122 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3125 extra_dict
["params"] = params
3126 expected_result
= deepcopy(expected_result_rebuild_start_stop
)
3129 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3130 expected_result
["params"] = params
3131 expected_result
["task_id"] = "bb937f49-3870-4169-b758-9732e1ff40f3:3"
3132 task
= self
.ns
.rebuild_start_stop_task(
3142 self
.assertDictEqual(task
, expected_result
)
3144 @patch("osm_ng_ro.ns.Ns._assign_vim")
3145 def test__rebuild_start_stop_task__assign_vim_raises__task_is_not_created(
3150 actions
= ["start", "stop", "rebuild"]
3153 for action
in actions
:
3155 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3158 extra_dict
["params"] = params
3159 assign_vim
.side_effect
= TestException("Can not connect to VIM.")
3160 with self
.assertRaises(TestException
) as err
:
3161 task
= self
.ns
.rebuild_start_stop_task(
3171 self
.assertEqual(task
, None)
3172 self
.assertEqual(str(err
.exception
), "Can not connect to VIM.")
3174 @patch("osm_ng_ro.ns.Ns._assign_vim")
3175 def test_verticalscale_task__successful(self
, assign_vim
):
3179 task
= self
.ns
.verticalscale_task(
3186 extra_dict_vertical_scale
,
3188 self
.assertDictEqual(task
, expected_result_vertical_scale
)
3190 @patch("osm_ng_ro.ns.Ns._assign_vim")
3191 def test_verticalscale_task__task_index_changes__task_id_changes(self
, assign_vim
):
3195 expected_result
= deepcopy(expected_result_vertical_scale
)
3196 expected_result
["task_id"] = "bb937f49-3870-4169-b758-9732e1ff40f3:2"
3197 task
= self
.ns
.verticalscale_task(
3204 extra_dict_vertical_scale
,
3206 self
.assertDictEqual(task
, expected_result
)
3208 @patch("osm_ng_ro.ns.Ns._assign_vim")
3209 def test_verticalscale_task__empty_extra_dict__expected_result_without_params(
3216 expected_result
= deepcopy(expected_result_vertical_scale
)
3217 expected_result
.pop("params")
3218 task
= self
.ns
.verticalscale_task(
3219 vdu
, vnf
, vdu_index
, action_id
, nsr_id_2
, task_index
, extra_dict
3221 self
.assertDictEqual(task
, expected_result
)
3223 @patch("osm_ng_ro.ns.Ns._assign_vim")
3224 def test_verticalscale_task__assign_vim_raises__task_is_not_created(
3230 assign_vim
.side_effect
= TestException("Can not connect to VIM.")
3231 with self
.assertRaises(TestException
) as err
:
3232 task
= self
.ns
.verticalscale_task(
3239 extra_dict_vertical_scale
,
3241 self
.assertEqual(task
, {})
3242 self
.assertEqual(str(err
.exception
), "Can not connect to VIM.")
3244 @patch("osm_ng_ro.ns.Ns._assign_vim")
3245 def test_migrate_task__successful(self
, assign_vim
):
3249 task
= self
.ns
.migrate_task(
3250 vdu
, vnf
, vdu_index
, action_id
, nsr_id_2
, task_index
, extra_dict_migrate
3252 self
.assertDictEqual(task
, expected_result_migrate
)
3254 @patch("osm_ng_ro.ns.Ns._assign_vim")
3255 def test_migrate_task__empty_extra_dict__task_without_params(self
, assign_vim
):
3260 expected_result
= deepcopy(expected_result_migrate
)
3261 expected_result
.pop("params")
3262 task
= self
.ns
.migrate_task(
3263 vdu
, vnf
, vdu_index
, action_id
, nsr_id_2
, task_index
, extra_dict
3265 self
.assertDictEqual(task
, expected_result
)
3267 @patch("osm_ng_ro.ns.Ns._assign_vim")
3268 def test_migrate_task__different_vdu_index__target_record_with_different_vdu_index(
3274 expected_result
= deepcopy(expected_result_migrate
)
3277 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.4.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3278 task
= self
.ns
.migrate_task(
3279 vdu
, vnf
, vdu_index
, action_id
, nsr_id_2
, task_index
, extra_dict_migrate
3281 self
.assertDictEqual(task
, expected_result
)
3283 @patch("osm_ng_ro.ns.Ns._assign_vim")
3284 def test_migrate_task__assign_vim_raises__task_is_not_created(self
, assign_vim
):
3288 assign_vim
.side_effect
= TestException("Can not connect to VIM.")
3289 with self
.assertRaises(TestException
) as err
:
3290 task
= self
.ns
.migrate_task(
3291 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict_migrate
3293 self
.assertDictEqual(task
, {})
3294 self
.assertEqual(str(err
.exception
), "Can not connect to VIM.")
3297 class TestProcessVduParams(unittest
.TestCase
):
3300 self
.logger
= CopyingMock(autospec
=True)
3302 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3303 def test_find_persistent_root_volumes_empty_instantiation_vol_list(
3304 self
, mock_volume_keeping_required
3306 """Find persistent root volume, instantiation_vol_list is empty."""
3307 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3308 target_vdu
= target_vdu_wth_persistent_storage
3309 vdu_instantiation_volumes_list
= []
3311 mock_volume_keeping_required
.return_value
= True
3312 expected_root_disk
= {
3313 "id": "persistent-root-volume",
3314 "type-of-storage": "persistent-storage:persistent-storage",
3315 "size-of-storage": "10",
3316 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}],
3318 expected_persist_root_disk
= {
3319 "persistent-root-volume": {
3320 "image_id": "ubuntu20.04",
3325 expected_disk_list
= [
3327 "image_id": "ubuntu20.04",
3332 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3333 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3335 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3336 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3337 self
.assertEqual(disk_list
, expected_disk_list
)
3338 self
.assertEqual(len(disk_list
), 1)
3340 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3341 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(
3342 self
, mock_volume_keeping_required
3344 """Find persistent root volume, always selects the first vsd as root volume."""
3345 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3346 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3347 "persistent-volume2",
3348 "persistent-root-volume",
3351 target_vdu
= target_vdu_wth_persistent_storage
3352 vdu_instantiation_volumes_list
= []
3354 mock_volume_keeping_required
.return_value
= True
3355 expected_root_disk
= {
3356 "id": "persistent-volume2",
3357 "type-of-storage": "persistent-storage:persistent-storage",
3358 "size-of-storage": "10",
3360 expected_persist_root_disk
= {
3361 "persistent-volume2": {
3362 "image_id": "ubuntu20.04",
3367 expected_disk_list
= [
3369 "image_id": "ubuntu20.04",
3374 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3375 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3377 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3378 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3379 self
.assertEqual(disk_list
, expected_disk_list
)
3380 self
.assertEqual(len(disk_list
), 1)
3382 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3383 def test_find_persistent_root_volumes_empty_size_of_storage(
3384 self
, mock_volume_keeping_required
3386 """Find persistent root volume, size of storage is empty."""
3387 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3388 vnfd
["virtual-storage-desc"][0]["size-of-storage"] = ""
3389 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3390 "persistent-volume2",
3391 "persistent-root-volume",
3394 target_vdu
= target_vdu_wth_persistent_storage
3395 vdu_instantiation_volumes_list
= []
3397 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3398 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3400 self
.assertEqual(persist_root_disk
, None)
3401 mock_volume_keeping_required
.assert_not_called()
3402 self
.assertEqual(disk_list
, [])
3404 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3405 def test_find_persistent_root_volumes_keeping_is_not_required(
3406 self
, mock_volume_keeping_required
3408 """Find persistent root volume, volume keeping is not required."""
3409 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3410 vnfd
["virtual-storage-desc"][1]["vdu-storage-requirements"] = [
3411 {"key": "keep-volume", "value": "false"},
3413 target_vdu
= target_vdu_wth_persistent_storage
3414 vdu_instantiation_volumes_list
= []
3416 mock_volume_keeping_required
.return_value
= False
3417 expected_root_disk
= {
3418 "id": "persistent-root-volume",
3419 "type-of-storage": "persistent-storage:persistent-storage",
3420 "size-of-storage": "10",
3421 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}],
3423 expected_persist_root_disk
= {
3424 "persistent-root-volume": {
3425 "image_id": "ubuntu20.04",
3430 expected_disk_list
= [
3432 "image_id": "ubuntu20.04",
3437 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3438 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3440 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3441 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3442 self
.assertEqual(disk_list
, expected_disk_list
)
3443 self
.assertEqual(len(disk_list
), 1)
3445 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3446 def test_find_persistent_root_volumes_target_vdu_mismatch(
3447 self
, mock_volume_keeping_required
3449 """Find persistent root volume, target vdu name is not matching."""
3450 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3451 vnfd
["vdu"][0]["name"] = "Several_Volumes-VM"
3452 target_vdu
= target_vdu_wth_persistent_storage
3453 vdu_instantiation_volumes_list
= []
3455 result
= self
.ns
.find_persistent_root_volumes(
3456 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3458 self
.assertEqual(result
, None)
3459 mock_volume_keeping_required
.assert_not_called()
3460 self
.assertEqual(disk_list
, [])
3461 self
.assertEqual(len(disk_list
), 0)
3463 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3464 def test_find_persistent_root_volumes_with_instantiation_vol_list(
3465 self
, mock_volume_keeping_required
3467 """Find persistent root volume, existing volume needs to be used."""
3468 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3469 target_vdu
= target_vdu_wth_persistent_storage
3470 vdu_instantiation_volumes_list
= [
3472 "vim-volume-id": vim_volume_id
,
3473 "name": "persistent-root-volume",
3477 expected_persist_root_disk
= {
3478 "persistent-root-volume": {
3479 "vim_volume_id": vim_volume_id
,
3480 "image_id": "ubuntu20.04",
3483 expected_disk_list
= [
3485 "vim_volume_id": vim_volume_id
,
3486 "image_id": "ubuntu20.04",
3489 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3490 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3492 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3493 mock_volume_keeping_required
.assert_not_called()
3494 self
.assertEqual(disk_list
, expected_disk_list
)
3495 self
.assertEqual(len(disk_list
), 1)
3497 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3498 def test_find_persistent_root_volumes_invalid_instantiation_params(
3499 self
, mock_volume_keeping_required
3501 """Find persistent root volume, existing volume id keyword is invalid."""
3502 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3503 target_vdu
= target_vdu_wth_persistent_storage
3504 vdu_instantiation_volumes_list
= [
3506 "volume-id": vim_volume_id
,
3507 "name": "persistent-root-volume",
3511 with self
.assertRaises(KeyError):
3512 self
.ns
.find_persistent_root_volumes(
3513 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3515 mock_volume_keeping_required
.assert_not_called()
3516 self
.assertEqual(disk_list
, [])
3517 self
.assertEqual(len(disk_list
), 0)
3519 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3520 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3521 self
, mock_volume_keeping_required
3523 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3524 persistent_root_disk
= {
3525 "persistent-root-volume": {
3526 "image_id": "ubuntu20.04",
3531 mock_volume_keeping_required
.return_value
= False
3532 target_vdu
= target_vdu_wth_persistent_storage
3533 vdu_instantiation_volumes_list
= []
3536 "image_id": "ubuntu20.04",
3542 "id": "persistent-volume2",
3543 "size-of-storage": "10",
3544 "type-of-storage": "persistent-storage:persistent-storage",
3546 expected_disk_list
= [
3548 "image_id": "ubuntu20.04",
3557 self
.ns
.find_persistent_volumes(
3558 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3560 self
.assertEqual(disk_list
, expected_disk_list
)
3561 mock_volume_keeping_required
.assert_called_once_with(expected_disk
)
3563 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3564 def test_find_persistent_volumes_vdu_wth_inst_vol_list(
3565 self
, mock_volume_keeping_required
3567 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3568 persistent_root_disk
= {
3569 "persistent-root-volume": {
3570 "image_id": "ubuntu20.04",
3575 vdu_instantiation_volumes_list
= [
3577 "vim-volume-id": vim_volume_id
,
3578 "name": "persistent-volume2",
3581 target_vdu
= target_vdu_wth_persistent_storage
3584 "image_id": "ubuntu20.04",
3589 expected_disk_list
= [
3591 "image_id": "ubuntu20.04",
3596 "vim_volume_id": vim_volume_id
,
3599 self
.ns
.find_persistent_volumes(
3600 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3602 self
.assertEqual(disk_list
, expected_disk_list
)
3603 mock_volume_keeping_required
.assert_not_called()
3605 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3606 def test_find_persistent_volumes_vdu_wthout_persistent_storage(
3607 self
, mock_volume_keeping_required
3609 """Find persistent ordinary volume, there is not any persistent disk."""
3610 persistent_root_disk
= {}
3611 vdu_instantiation_volumes_list
= []
3612 mock_volume_keeping_required
.return_value
= False
3613 target_vdu
= target_vdu_wthout_persistent_storage
3615 self
.ns
.find_persistent_volumes(
3616 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3618 self
.assertEqual(disk_list
, disk_list
)
3619 mock_volume_keeping_required
.assert_not_called()
3621 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3622 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3623 self
, mock_volume_keeping_required
3625 """There is persistent root disk, but there is not ordinary persistent disk."""
3626 persistent_root_disk
= {
3627 "persistent-root-volume": {
3628 "image_id": "ubuntu20.04",
3633 vdu_instantiation_volumes_list
= []
3634 mock_volume_keeping_required
.return_value
= False
3635 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3636 target_vdu
["virtual-storages"] = [
3638 "id": "persistent-root-volume",
3639 "size-of-storage": "10",
3640 "type-of-storage": "persistent-storage:persistent-storage",
3641 "vdu-storage-requirements": [
3642 {"key": "keep-volume", "value": "true"},
3646 "id": "ephemeral-volume",
3647 "size-of-storage": "1",
3648 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3653 "image_id": "ubuntu20.04",
3658 self
.ns
.find_persistent_volumes(
3659 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3661 self
.assertEqual(disk_list
, disk_list
)
3662 mock_volume_keeping_required
.assert_not_called()
3664 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3665 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(
3666 self
, mock_volume_keeping_required
3668 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3669 vim-volume-id is given as instantiation parameter but disk id is not matching.
3671 mock_volume_keeping_required
.return_value
= True
3672 vdu_instantiation_volumes_list
= [
3674 "vim-volume-id": vim_volume_id
,
3675 "name": "persistent-volume3",
3678 persistent_root_disk
= {
3679 "persistent-root-volume": {
3680 "image_id": "ubuntu20.04",
3687 "image_id": "ubuntu20.04",
3692 expected_disk_list
= [
3694 "image_id": "ubuntu20.04",
3704 "id": "persistent-volume2",
3705 "size-of-storage": "10",
3706 "type-of-storage": "persistent-storage:persistent-storage",
3708 target_vdu
= target_vdu_wth_persistent_storage
3709 self
.ns
.find_persistent_volumes(
3710 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3712 self
.assertEqual(disk_list
, expected_disk_list
)
3713 mock_volume_keeping_required
.assert_called_once_with(expected_disk
)
3715 def test_is_volume_keeping_required_true(self
):
3716 """Volume keeping is required."""
3717 virtual_storage_descriptor
= {
3718 "id": "persistent-root-volume",
3719 "type-of-storage": "persistent-storage:persistent-storage",
3720 "size-of-storage": "10",
3721 "vdu-storage-requirements": [
3722 {"key": "keep-volume", "value": "true"},
3725 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3726 self
.assertEqual(result
, True)
3728 def test_is_volume_keeping_required_false(self
):
3729 """Volume keeping is not required."""
3730 virtual_storage_descriptor
= {
3731 "id": "persistent-root-volume",
3732 "type-of-storage": "persistent-storage:persistent-storage",
3733 "size-of-storage": "10",
3734 "vdu-storage-requirements": [
3735 {"key": "keep-volume", "value": "false"},
3738 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3739 self
.assertEqual(result
, False)
3741 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self
):
3742 """Volume keeping is not required, vdu-storage-requirements key does not exist."""
3743 virtual_storage_descriptor
= {
3744 "id": "persistent-root-volume",
3745 "type-of-storage": "persistent-storage:persistent-storage",
3746 "size-of-storage": "10",
3748 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3749 self
.assertEqual(result
, False)
3751 def test_is_volume_keeping_required_wrong_keyword(self
):
3752 """vdu-storage-requirements key to indicate keeping-volume is wrong."""
3753 virtual_storage_descriptor
= {
3754 "id": "persistent-root-volume",
3755 "type-of-storage": "persistent-storage:persistent-storage",
3756 "size-of-storage": "10",
3757 "vdu-storage-requirements": [
3758 {"key": "hold-volume", "value": "true"},
3761 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3762 self
.assertEqual(result
, False)
3764 def test_sort_vdu_interfaces_position_all_wth_positions(self
):
3765 """Interfaces are sorted according to position, all have positions."""
3766 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3767 target_vdu
["interfaces"] = [
3770 "ns-vld-id": "datanet",
3775 "ns-vld-id": "mgmtnet",
3779 sorted_interfaces
= [
3782 "ns-vld-id": "mgmtnet",
3787 "ns-vld-id": "datanet",
3791 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3792 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3794 def test_sort_vdu_interfaces_position_some_wth_position(self
):
3795 """Interfaces are sorted according to position, some of them have positions."""
3796 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3797 target_vdu
["interfaces"] = [
3800 "ns-vld-id": "mgmtnet",
3804 "ns-vld-id": "datanet",
3808 sorted_interfaces
= [
3811 "ns-vld-id": "datanet",
3816 "ns-vld-id": "mgmtnet",
3819 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3820 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3822 def test_sort_vdu_interfaces_position_empty_interface_list(self
):
3823 """Interface list is empty."""
3824 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3825 target_vdu
["interfaces"] = []
3826 sorted_interfaces
= []
3827 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3828 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3830 def test_partially_locate_vdu_interfaces(self
):
3831 """Some interfaces have positions."""
3832 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3833 target_vdu
["interfaces"] = [
3836 "ns-vld-id": "net1",
3838 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3841 "ns-vld-id": "mgmtnet",
3845 "ns-vld-id": "datanet",
3849 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3850 self
.assertDictEqual(
3851 target_vdu
["interfaces"][0],
3854 "ns-vld-id": "datanet",
3858 self
.assertDictEqual(
3859 target_vdu
["interfaces"][2],
3860 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3863 def test_partially_locate_vdu_interfaces_position_start_from_0(self
):
3864 """Some interfaces have positions, position start from 0."""
3865 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3866 target_vdu
["interfaces"] = [
3869 "ns-vld-id": "net1",
3871 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3874 "ns-vld-id": "mgmtnet",
3878 "ns-vld-id": "datanet",
3882 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3883 self
.assertDictEqual(
3884 target_vdu
["interfaces"][0],
3887 "ns-vld-id": "datanet",
3891 self
.assertDictEqual(
3892 target_vdu
["interfaces"][3],
3893 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3896 def test_partially_locate_vdu_interfaces_wthout_position(self
):
3897 """Interfaces do not have positions."""
3898 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3899 target_vdu
["interfaces"] = interfaces_wthout_positions
3900 expected_result
= deepcopy(target_vdu
["interfaces"])
3901 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3902 self
.assertEqual(target_vdu
["interfaces"], expected_result
)
3904 def test_partially_locate_vdu_interfaces_all_has_position(self
):
3905 """All interfaces have position."""
3906 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3907 target_vdu
["interfaces"] = interfaces_wth_all_positions
3908 expected_interfaces
= [
3911 "ns-vld-id": "net2",
3916 "ns-vld-id": "mgmtnet",
3921 "ns-vld-id": "net1",
3925 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3926 self
.assertEqual(target_vdu
["interfaces"], expected_interfaces
)
3928 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3929 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3930 def test_prepare_vdu_cloud_init(self
, mock_parse_jinja2
, mock_get_cloud_init
):
3931 """Target_vdu has cloud-init and boot-data-drive."""
3932 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3933 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3934 target_vdu
["boot-data-drive"] = "vda"
3936 mock_get_cloud_init
.return_value
= cloud_init_content
3937 mock_parse_jinja2
.return_value
= user_data
3939 "user-data": user_data
,
3940 "boot-data-drive": "vda",
3942 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3943 self
.assertDictEqual(result
, expected_result
)
3944 mock_get_cloud_init
.assert_called_once_with(
3945 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3947 mock_parse_jinja2
.assert_called_once_with(
3948 cloud_init_content
=cloud_init_content
,
3950 context
="sample-cloud-init-path",
3953 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3954 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3955 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3956 self
, mock_parse_jinja2
, mock_get_cloud_init
3958 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3959 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3960 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3961 target_vdu
["boot-data-drive"] = "vda"
3963 mock_get_cloud_init
.side_effect
= NsException(
3964 "Mismatch descriptor for cloud init."
3967 with self
.assertRaises(NsException
) as err
:
3968 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3969 self
.assertEqual(str(err
.exception
), "Mismatch descriptor for cloud init.")
3971 mock_get_cloud_init
.assert_called_once_with(
3972 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3974 mock_parse_jinja2
.assert_not_called()
3976 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3977 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3978 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3979 self
, mock_parse_jinja2
, mock_get_cloud_init
3981 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3982 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3983 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3984 target_vdu
["boot-data-drive"] = "vda"
3986 mock_get_cloud_init
.return_value
= cloud_init_content
3987 mock_parse_jinja2
.side_effect
= NsException("Error parsing cloud-init content.")
3989 with self
.assertRaises(NsException
) as err
:
3990 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3991 self
.assertEqual(str(err
.exception
), "Error parsing cloud-init content.")
3992 mock_get_cloud_init
.assert_called_once_with(
3993 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3995 mock_parse_jinja2
.assert_called_once_with(
3996 cloud_init_content
=cloud_init_content
,
3998 context
="sample-cloud-init-path",
4001 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
4002 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
4003 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
4004 self
, mock_parse_jinja2
, mock_get_cloud_init
4006 """Target_vdu has cloud-init but do not have boot-data-drive."""
4007 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4008 target_vdu
["cloud-init"] = "sample-cloud-init-path"
4010 mock_get_cloud_init
.return_value
= cloud_init_content
4011 mock_parse_jinja2
.return_value
= user_data
4013 "user-data": user_data
,
4015 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
4016 self
.assertDictEqual(result
, expected_result
)
4017 mock_get_cloud_init
.assert_called_once_with(
4018 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
4020 mock_parse_jinja2
.assert_called_once_with(
4021 cloud_init_content
=cloud_init_content
,
4023 context
="sample-cloud-init-path",
4026 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
4027 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
4028 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
4029 self
, mock_parse_jinja2
, mock_get_cloud_init
4031 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
4032 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4033 target_vdu
["cloud-init"] = "sample-cloud-init-path"
4034 target_vdu
["boot-data-drive"] = "vda"
4035 vdu2cloud_init
= {"sample-cloud-init-path": cloud_init_content
}
4036 mock_parse_jinja2
.return_value
= user_data
4038 "user-data": user_data
,
4039 "boot-data-drive": "vda",
4041 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
4042 self
.assertDictEqual(result
, expected_result
)
4043 mock_get_cloud_init
.assert_not_called()
4044 mock_parse_jinja2
.assert_called_once_with(
4045 cloud_init_content
=cloud_init_content
,
4047 context
="sample-cloud-init-path",
4050 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
4051 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
4052 def test_prepare_vdu_cloud_init_no_cloud_init(
4053 self
, mock_parse_jinja2
, mock_get_cloud_init
4055 """Target_vdu do not have cloud-init."""
4056 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4057 target_vdu
["boot-data-drive"] = "vda"
4060 "boot-data-drive": "vda",
4062 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
4063 self
.assertDictEqual(result
, expected_result
)
4064 mock_get_cloud_init
.assert_not_called()
4065 mock_parse_jinja2
.assert_not_called()
4067 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self
):
4068 """ns_vld and vnf_vld both exist."""
4071 "ns-vld-id": "mgmtnet",
4072 "vnf-vld-id": "mgmt_cp_int",
4074 expected_result
= f
"{ns_preffix}:vld.mgmtnet"
4075 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4076 interface
, ns_preffix
, vnf_preffix
4078 self
.assertEqual(result
, expected_result
)
4080 def test_check_vld_information_of_interfaces_empty_interfaces(self
):
4081 """Interface dict is empty."""
4083 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4084 interface
, ns_preffix
, vnf_preffix
4086 self
.assertEqual(result
, "")
4088 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self
):
4089 """Interface dict has only vnf_vld."""
4092 "vnf-vld-id": "mgmt_cp_int",
4094 expected_result
= f
"{vnf_preffix}:vld.mgmt_cp_int"
4095 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4096 interface
, ns_preffix
, vnf_preffix
4098 self
.assertEqual(result
, expected_result
)
4100 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
4103 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
4106 "vnf-vld-id": "mgmt_cp_int",
4109 with self
.assertRaises(Exception) as err
:
4110 self
.ns
._check
_vld
_information
_of
_interfaces
(
4111 interface
, ns_preffix
, vnf_preffix
4113 self
.assertEqual(type(err
), TypeError)
4115 def test_prepare_interface_port_security_has_security_details(self
):
4116 """Interface dict has port security details."""
4119 "ns-vld-id": "mgmtnet",
4120 "vnf-vld-id": "mgmt_cp_int",
4121 "port-security-enabled": True,
4122 "port-security-disable-strategy": "allow-address-pairs",
4124 expected_interface
= {
4126 "ns-vld-id": "mgmtnet",
4127 "vnf-vld-id": "mgmt_cp_int",
4128 "port_security": True,
4129 "port_security_disable_strategy": "allow-address-pairs",
4131 self
.ns
._prepare
_interface
_port
_security
(interface
)
4132 self
.assertDictEqual(interface
, expected_interface
)
4134 def test_prepare_interface_port_security_empty_interfaces(self
):
4135 """Interface dict is empty."""
4137 expected_interface
= {}
4138 self
.ns
._prepare
_interface
_port
_security
(interface
)
4139 self
.assertDictEqual(interface
, expected_interface
)
4141 def test_prepare_interface_port_security_wthout_port_security(self
):
4142 """Interface dict does not have port security details."""
4145 "ns-vld-id": "mgmtnet",
4146 "vnf-vld-id": "mgmt_cp_int",
4148 expected_interface
= {
4150 "ns-vld-id": "mgmtnet",
4151 "vnf-vld-id": "mgmt_cp_int",
4153 self
.ns
._prepare
_interface
_port
_security
(interface
)
4154 self
.assertDictEqual(interface
, expected_interface
)
4156 def test_create_net_item_of_interface_floating_ip_port_security(self
):
4157 """Interface dict has floating ip, port-security details."""
4160 "vcpi": "sample_vcpi",
4161 "port_security": True,
4162 "port_security_disable_strategy": "allow-address-pairs",
4163 "floating_ip": "10.1.1.12",
4164 "ns-vld-id": "mgmtnet",
4165 "vnf-vld-id": "mgmt_cp_int",
4167 net_text
= f
"{ns_preffix}"
4168 expected_net_item
= {
4170 "port_security": True,
4171 "port_security_disable_strategy": "allow-address-pairs",
4172 "floating_ip": "10.1.1.12",
4173 "net_id": f
"TASK-{ns_preffix}",
4176 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4177 self
.assertDictEqual(result
, expected_net_item
)
4179 def test_create_net_item_of_interface_invalid_net_text(self
):
4180 """net-text is invalid."""
4183 "vcpi": "sample_vcpi",
4184 "port_security": True,
4185 "port_security_disable_strategy": "allow-address-pairs",
4186 "floating_ip": "10.1.1.12",
4187 "ns-vld-id": "mgmtnet",
4188 "vnf-vld-id": "mgmt_cp_int",
4191 with self
.assertRaises(TypeError):
4192 self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4194 def test_create_net_item_of_interface_empty_interface(self
):
4195 """Interface dict is empty."""
4197 net_text
= ns_preffix
4198 expected_net_item
= {
4199 "net_id": f
"TASK-{ns_preffix}",
4202 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4203 self
.assertDictEqual(result
, expected_net_item
)
4205 @patch("osm_ng_ro.ns.deep_get")
4206 def test_prepare_type_of_interface_type_sriov(self
, mock_deep_get
):
4207 """Interface type is SR-IOV."""
4210 "vcpi": "sample_vcpi",
4211 "port_security": True,
4212 "port_security_disable_strategy": "allow-address-pairs",
4213 "floating_ip": "10.1.1.12",
4214 "ns-vld-id": "mgmtnet",
4215 "vnf-vld-id": "mgmt_cp_int",
4218 mock_deep_get
.return_value
= "SR-IOV"
4219 net_text
= ns_preffix
4221 expected_net_item
= {
4226 self
.ns
._prepare
_type
_of
_interface
(
4227 interface
, tasks_by_target_record_id
, net_text
, net_item
4229 self
.assertDictEqual(net_item
, expected_net_item
)
4232 tasks_by_target_record_id
[net_text
]["extra_dict"]["params"]["net_type"],
4234 mock_deep_get
.assert_called_once_with(
4235 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4238 @patch("osm_ng_ro.ns.deep_get")
4239 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
4242 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
4245 "vcpi": "sample_vcpi",
4246 "port_security": True,
4247 "port_security_disable_strategy": "allow-address-pairs",
4248 "floating_ip": "10.1.1.12",
4249 "ns-vld-id": "mgmtnet",
4250 "vnf-vld-id": "mgmt_cp_int",
4251 "type": "PCI-PASSTHROUGH",
4253 mock_deep_get
.return_value
= {}
4254 tasks_by_target_record_id
= {}
4255 net_text
= ns_preffix
4257 expected_net_item
= {
4259 "model": "PCI-PASSTHROUGH",
4260 "type": "PCI-PASSTHROUGH",
4262 self
.ns
._prepare
_type
_of
_interface
(
4263 interface
, tasks_by_target_record_id
, net_text
, net_item
4265 self
.assertDictEqual(net_item
, expected_net_item
)
4266 mock_deep_get
.assert_called_once_with(
4267 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4270 @patch("osm_ng_ro.ns.deep_get")
4271 def test_prepare_type_of_interface_type_mgmt(self
, mock_deep_get
):
4272 """Interface type is mgmt."""
4275 "vcpi": "sample_vcpi",
4276 "port_security": True,
4277 "port_security_disable_strategy": "allow-address-pairs",
4278 "floating_ip": "10.1.1.12",
4279 "ns-vld-id": "mgmtnet",
4280 "vnf-vld-id": "mgmt_cp_int",
4283 tasks_by_target_record_id
= {}
4284 net_text
= ns_preffix
4286 expected_net_item
= {
4289 self
.ns
._prepare
_type
_of
_interface
(
4290 interface
, tasks_by_target_record_id
, net_text
, net_item
4292 self
.assertDictEqual(net_item
, expected_net_item
)
4293 mock_deep_get
.assert_not_called()
4295 @patch("osm_ng_ro.ns.deep_get")
4296 def test_prepare_type_of_interface_type_bridge(self
, mock_deep_get
):
4297 """Interface type is bridge."""
4300 "vcpi": "sample_vcpi",
4301 "port_security": True,
4302 "port_security_disable_strategy": "allow-address-pairs",
4303 "floating_ip": "10.1.1.12",
4304 "ns-vld-id": "mgmtnet",
4305 "vnf-vld-id": "mgmt_cp_int",
4307 tasks_by_target_record_id
= {}
4308 net_text
= ns_preffix
4310 expected_net_item
= {
4314 self
.ns
._prepare
_type
_of
_interface
(
4315 interface
, tasks_by_target_record_id
, net_text
, net_item
4317 self
.assertDictEqual(net_item
, expected_net_item
)
4318 mock_deep_get
.assert_not_called()
4320 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4321 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4322 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4323 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4324 def test_prepare_vdu_interfaces(
4326 mock_type_of_interface
,
4327 mock_item_of_interface
,
4329 mock_vld_information_of_interface
,
4331 """Prepare vdu interfaces successfully."""
4332 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4335 "ns-vld-id": "net1",
4336 "ip-address": "13.2.12.31",
4337 "mgmt-interface": True,
4341 "vnf-vld-id": "net2",
4342 "mac-address": "d0:94:66:ed:fc:e2",
4346 "ns-vld-id": "mgmtnet",
4348 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4350 "params": "test_params",
4351 "find_params": "test_find_params",
4355 net_text_1
= f
"{ns_preffix}:net1"
4356 net_text_2
= f
"{vnf_preffix}:net2"
4357 net_text_3
= f
"{ns_preffix}:mgmtnet"
4360 "net_id": f
"TASK-{ns_preffix}",
4365 "net_id": f
"TASK-{ns_preffix}",
4370 "net_id": f
"TASK-{ns_preffix}",
4373 mock_item_of_interface
.side_effect
= [net_item_1
, net_item_2
, net_item_3
]
4374 mock_vld_information_of_interface
.side_effect
= [
4380 expected_extra_dict
= {
4381 "params": "test_params",
4382 "find_params": "test_find_params",
4383 "depends_on": [net_text_1
, net_text_2
, net_text_3
],
4384 "mgmt_vdu_interface": 0,
4386 updated_net_item1
= deepcopy(net_item_1
)
4387 updated_net_item1
.update({"ip_address": "13.2.12.31"})
4388 updated_net_item2
= deepcopy(net_item_2
)
4389 updated_net_item2
.update({"mac_address": "d0:94:66:ed:fc:e2"})
4390 expected_net_list
= [updated_net_item1
, updated_net_item2
, net_item_3
]
4391 self
.ns
._prepare
_vdu
_interfaces
(
4397 tasks_by_target_record_id
,
4400 _call_mock_vld_information_of_interface
= (
4401 mock_vld_information_of_interface
.call_args_list
4404 _call_mock_vld_information_of_interface
[0][0],
4405 (interface_1
, ns_preffix
, vnf_preffix
),
4408 _call_mock_vld_information_of_interface
[1][0],
4409 (interface_2
, ns_preffix
, vnf_preffix
),
4412 _call_mock_vld_information_of_interface
[2][0],
4413 (interface_3
, ns_preffix
, vnf_preffix
),
4416 _call_mock_port_security
= mock_port_security
.call_args_list
4417 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4418 self
.assertEqual(_call_mock_port_security
[1].args
[0], interface_2
)
4419 self
.assertEqual(_call_mock_port_security
[2].args
[0], interface_3
)
4421 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4422 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4423 self
.assertEqual(_call_mock_item_of_interface
[1][0], (interface_2
, net_text_2
))
4424 self
.assertEqual(_call_mock_item_of_interface
[2][0], (interface_3
, net_text_3
))
4426 _call_mock_type_of_interface
= mock_type_of_interface
.call_args_list
4428 _call_mock_type_of_interface
[0][0],
4429 (interface_1
, tasks_by_target_record_id
, net_text_1
, net_item_1
),
4432 _call_mock_type_of_interface
[1][0],
4433 (interface_2
, tasks_by_target_record_id
, net_text_2
, net_item_2
),
4436 _call_mock_type_of_interface
[2][0],
4437 (interface_3
, tasks_by_target_record_id
, net_text_3
, net_item_3
),
4439 self
.assertEqual(net_list
, expected_net_list
)
4440 self
.assertEqual(extra_dict
, expected_extra_dict
)
4441 self
.logger
.error
.assert_not_called()
4443 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4444 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4445 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4446 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4447 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4449 mock_type_of_interface
,
4450 mock_item_of_interface
,
4452 mock_vld_information_of_interface
,
4454 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4455 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4458 "ns-vld-id": "net1",
4459 "ip-address": "13.2.12.31",
4460 "mgmt-interface": True,
4464 "vnf-vld-id": "net2",
4465 "mac-address": "d0:94:66:ed:fc:e2",
4469 "ns-vld-id": "mgmtnet",
4471 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4473 "params": "test_params",
4474 "find_params": "test_find_params",
4477 net_text_1
= f
"{ns_preffix}:net1"
4478 mock_item_of_interface
.side_effect
= [TypeError, TypeError, TypeError]
4480 mock_vld_information_of_interface
.side_effect
= [net_text_1
]
4482 expected_extra_dict
= {
4483 "params": "test_params",
4484 "find_params": "test_find_params",
4485 "depends_on": [net_text_1
],
4487 with self
.assertRaises(TypeError):
4488 self
.ns
._prepare
_vdu
_interfaces
(
4494 tasks_by_target_record_id
,
4498 _call_mock_vld_information_of_interface
= (
4499 mock_vld_information_of_interface
.call_args_list
4502 _call_mock_vld_information_of_interface
[0][0],
4503 (interface_1
, ns_preffix
, vnf_preffix
),
4506 _call_mock_port_security
= mock_port_security
.call_args_list
4507 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4509 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4510 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4512 mock_type_of_interface
.assert_not_called()
4513 self
.logger
.error
.assert_not_called()
4514 self
.assertEqual(net_list
, [])
4515 self
.assertEqual(extra_dict
, expected_extra_dict
)
4517 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4518 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4519 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4520 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4521 def test_prepare_vdu_interfaces_vld_information_is_empty(
4523 mock_type_of_interface
,
4524 mock_item_of_interface
,
4526 mock_vld_information_of_interface
,
4528 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4529 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4532 "ns-vld-id": "net1",
4533 "ip-address": "13.2.12.31",
4534 "mgmt-interface": True,
4538 "vnf-vld-id": "net2",
4539 "mac-address": "d0:94:66:ed:fc:e2",
4543 "ns-vld-id": "mgmtnet",
4545 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4547 "params": "test_params",
4548 "find_params": "test_find_params",
4551 mock_vld_information_of_interface
.side_effect
= ["", "", ""]
4553 self
.ns
._prepare
_vdu
_interfaces
(
4559 tasks_by_target_record_id
,
4563 _call_mock_vld_information_of_interface
= (
4564 mock_vld_information_of_interface
.call_args_list
4567 _call_mock_vld_information_of_interface
[0][0],
4568 (interface_1
, ns_preffix
, vnf_preffix
),
4571 _call_mock_vld_information_of_interface
[1][0],
4572 (interface_2
, ns_preffix
, vnf_preffix
),
4575 _call_mock_vld_information_of_interface
[2][0],
4576 (interface_3
, ns_preffix
, vnf_preffix
),
4579 _call_logger
= self
.logger
.error
.call_args_list
4582 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4586 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4590 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4592 self
.assertEqual(net_list
, [])
4596 "params": "test_params",
4597 "find_params": "test_find_params",
4602 mock_item_of_interface
.assert_not_called()
4603 mock_port_security
.assert_not_called()
4604 mock_type_of_interface
.assert_not_called()
4606 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4607 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4608 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4609 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4610 def test_prepare_vdu_interfaces_empty_interface_list(
4612 mock_type_of_interface
,
4613 mock_item_of_interface
,
4615 mock_vld_information_of_interface
,
4617 """Prepare vdu interfaces, interface list is empty."""
4618 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4619 target_vdu
["interfaces"] = []
4622 self
.ns
._prepare
_vdu
_interfaces
(
4628 tasks_by_target_record_id
,
4631 mock_type_of_interface
.assert_not_called()
4632 mock_vld_information_of_interface
.assert_not_called()
4633 mock_item_of_interface
.assert_not_called()
4634 mock_port_security
.assert_not_called()
4636 def test_prepare_vdu_ssh_keys(self
):
4637 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4638 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4639 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4640 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4641 target_vdu
["ssh-access-required"] = True
4643 expected_cloud_config
= {
4644 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4646 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4647 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4649 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self
):
4650 """Target_vdu does not have ssh-keys."""
4651 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4652 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4653 target_vdu
["ssh-access-required"] = True
4655 expected_cloud_config
= {"key-pairs": [{"public_key": "path_of_public_key"}]}
4656 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4657 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4659 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self
):
4660 """Target_vdu has ssh-keys, ssh-access is not required."""
4661 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4662 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4663 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4664 target_vdu
["ssh-access-required"] = False
4666 expected_cloud_config
= {"key-pairs": ["sample-ssh-key"]}
4667 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4668 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4670 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4671 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4672 def test_add_persistent_root_disk_to_disk_list_keep_false(
4673 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4675 """Add persistent root disk to disk_list, keep volume set to False."""
4677 "id": "persistent-root-volume",
4678 "type-of-storage": "persistent-storage:persistent-storage",
4679 "size-of-storage": "10",
4681 mock_select_persistent_root_disk
.return_value
= root_disk
4682 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4683 vnfd
["virtual-storage-desc"][1] = root_disk
4684 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4685 persistent_root_disk
= {}
4687 mock_volume_keeping_required
.return_value
= False
4688 expected_disk_list
= [
4690 "image_id": "ubuntu20.04",
4695 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4696 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4698 self
.assertEqual(disk_list
, expected_disk_list
)
4699 mock_select_persistent_root_disk
.assert_called_once()
4700 mock_volume_keeping_required
.assert_called_once()
4702 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4703 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4704 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4705 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4707 """Add persistent root disk to disk_list"""
4709 "id": "persistent-root-volume",
4710 "type-of-storage": "persistent-storage:persistent-storage",
4711 "size-of-storage": "10",
4713 mock_select_persistent_root_disk
.side_effect
= AttributeError
4714 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4715 vnfd
["virtual-storage-desc"][1] = root_disk
4716 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4717 persistent_root_disk
= {}
4719 with self
.assertRaises(AttributeError):
4720 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4721 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4723 self
.assertEqual(disk_list
, [])
4724 mock_select_persistent_root_disk
.assert_called_once()
4725 mock_volume_keeping_required
.assert_not_called()
4727 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4728 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4729 def test_add_persistent_root_disk_to_disk_list_keep_true(
4730 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4732 """Add persistent root disk, keeo volume set to True."""
4733 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4734 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4735 mock_volume_keeping_required
.return_value
= True
4737 "id": "persistent-root-volume",
4738 "type-of-storage": "persistent-storage:persistent-storage",
4739 "size-of-storage": "10",
4740 "vdu-storage-requirements": [
4741 {"key": "keep-volume", "value": "true"},
4744 mock_select_persistent_root_disk
.return_value
= root_disk
4745 persistent_root_disk
= {}
4747 expected_disk_list
= [
4749 "image_id": "ubuntu20.04",
4754 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4755 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4757 self
.assertEqual(disk_list
, expected_disk_list
)
4758 mock_volume_keeping_required
.assert_called_once_with(root_disk
)
4760 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4761 def test_add_persistent_ordinary_disk_to_disk_list(
4762 self
, mock_volume_keeping_required
4764 """Add persistent ordinary disk, keeo volume set to True."""
4765 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4766 mock_volume_keeping_required
.return_value
= False
4767 persistent_root_disk
= {
4768 "persistent-root-volume": {
4769 "image_id": "ubuntu20.04",
4775 "id": "persistent-volume2",
4776 "type-of-storage": "persistent-storage:persistent-storage",
4777 "size-of-storage": "10",
4779 persistent_ordinary_disk
= {}
4782 expected_disk_list
= [
4786 "multiattach": False,
4787 "name": "persistent-volume2",
4790 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4792 persistent_root_disk
,
4793 persistent_ordinary_disk
,
4797 self
.assertEqual(disk_list
, expected_disk_list
)
4798 mock_volume_keeping_required
.assert_called_once_with(ordinary_disk
)
4800 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4801 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(
4802 self
, mock_volume_keeping_required
4804 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4805 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4806 mock_volume_keeping_required
.return_value
= False
4807 persistent_root_disk
= {
4808 "persistent-root-volume": {
4809 "image_id": "ubuntu20.04",
4813 "persistent-volume2": {
4817 persistent_ordinary_disk
= {}
4821 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4823 persistent_root_disk
,
4824 persistent_ordinary_disk
,
4828 self
.assertEqual(disk_list
, [])
4829 mock_volume_keeping_required
.assert_not_called()
4831 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4832 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4833 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4834 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4836 """VNFD does not have persistent storage."""
4837 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4838 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4839 mock_select_persistent_root_disk
.return_value
= None
4840 persistent_root_disk
= {}
4842 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4843 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4845 self
.assertEqual(disk_list
, [])
4846 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4847 mock_volume_keeping_required
.assert_not_called()
4849 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4850 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4851 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4852 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4854 """Persistent_root_disk dict is empty."""
4855 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4856 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4857 mock_select_persistent_root_disk
.return_value
= None
4858 persistent_root_disk
= {}
4860 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4861 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4863 self
.assertEqual(disk_list
, [])
4864 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4865 mock_volume_keeping_required
.assert_not_called()
4867 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self
):
4868 """Invalid extra dict."""
4869 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4870 target_vdu
["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4872 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4873 with self
.assertRaises(NsException
) as err
:
4874 self
.ns
._prepare
_vdu
_affinity
_group
_list
(target_vdu
, extra_dict
, ns_preffix
)
4875 self
.assertEqual(str(err
.exception
), "Invalid extra_dict format.")
4877 def test_prepare_vdu_affinity_group_list_one_affinity_group(self
):
4878 """There is one affinity-group."""
4879 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4880 target_vdu
["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4881 extra_dict
= {"depends_on": []}
4882 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4883 affinity_group_txt
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4884 expected_result
= [{"affinity_group_id": "TASK-" + affinity_group_txt
}]
4885 expected_extra_dict
= {"depends_on": [affinity_group_txt
]}
4886 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4887 target_vdu
, extra_dict
, ns_preffix
4889 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4890 self
.assertEqual(result
, expected_result
)
4892 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self
):
4893 """There are two affinity-groups."""
4894 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4895 target_vdu
["affinity-or-anti-affinity-group-id"] = [
4896 "affinity-group-id1",
4897 "affinity-group-id2",
4899 extra_dict
= {"depends_on": []}
4900 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4901 affinity_group_txt1
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4902 affinity_group_txt2
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4904 {"affinity_group_id": "TASK-" + affinity_group_txt1
},
4905 {"affinity_group_id": "TASK-" + affinity_group_txt2
},
4907 expected_extra_dict
= {"depends_on": [affinity_group_txt1
, affinity_group_txt2
]}
4908 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4909 target_vdu
, extra_dict
, ns_preffix
4911 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4912 self
.assertEqual(result
, expected_result
)
4914 def test_prepare_vdu_affinity_group_list_no_affinity_group(self
):
4915 """There is not any affinity-group."""
4916 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4917 extra_dict
= {"depends_on": []}
4918 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4919 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4920 target_vdu
, extra_dict
, ns_preffix
4922 self
.assertDictEqual(extra_dict
, {"depends_on": []})
4923 self
.assertEqual(result
, [])
4925 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4926 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4927 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4928 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4929 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4930 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4931 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4932 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4933 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4934 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4935 def test_process_vdu_params_with_inst_vol_list(
4937 mock_prepare_vdu_affinity_group_list
,
4938 mock_add_persistent_ordinary_disks_to_disk_list
,
4939 mock_add_persistent_root_disk_to_disk_list
,
4940 mock_find_persistent_volumes
,
4941 mock_find_persistent_root_volumes
,
4942 mock_prepare_vdu_ssh_keys
,
4943 mock_prepare_vdu_cloud_init
,
4944 mock_prepare_vdu_interfaces
,
4945 mock_locate_vdu_interfaces
,
4946 mock_sort_vdu_interfaces
,
4948 """Instantiation volume list is empty."""
4949 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4951 target_vdu
["interfaces"] = interfaces_wth_all_positions
4953 vdu_instantiation_vol_list
= [
4955 "vim-volume-id": vim_volume_id
,
4956 "name": "persistent-volume2",
4959 target_vdu
["additionalParams"] = {
4960 "OSM": {"vdu_volumes": vdu_instantiation_vol_list
}
4962 mock_prepare_vdu_cloud_init
.return_value
= {}
4963 mock_prepare_vdu_affinity_group_list
.return_value
= []
4964 persistent_root_disk
= {
4965 "persistent-root-volume": {
4966 "image_id": "ubuntu20.04",
4970 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
4972 new_kwargs
= deepcopy(kwargs
)
4977 "tasks_by_target_record_id": {},
4981 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
4982 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4983 db
.get_one
.return_value
= vnfd
4984 result
= Ns
._process
_vdu
_params
(
4985 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4987 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4988 mock_locate_vdu_interfaces
.assert_not_called()
4989 mock_prepare_vdu_cloud_init
.assert_called_once()
4990 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
4991 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
4992 mock_prepare_vdu_interfaces
.assert_called_once_with(
4994 expected_extra_dict_copy
,
5001 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5002 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5003 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5004 mock_find_persistent_volumes
.assert_called_once_with(
5005 persistent_root_disk
, target_vdu
, vdu_instantiation_vol_list
, []
5008 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5009 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5010 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5011 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5012 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5013 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5014 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5015 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5016 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5017 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5018 def test_process_vdu_params_wth_affinity_groups(
5020 mock_prepare_vdu_affinity_group_list
,
5021 mock_add_persistent_ordinary_disks_to_disk_list
,
5022 mock_add_persistent_root_disk_to_disk_list
,
5023 mock_find_persistent_volumes
,
5024 mock_find_persistent_root_volumes
,
5025 mock_prepare_vdu_ssh_keys
,
5026 mock_prepare_vdu_cloud_init
,
5027 mock_prepare_vdu_interfaces
,
5028 mock_locate_vdu_interfaces
,
5029 mock_sort_vdu_interfaces
,
5031 """There is cloud-config."""
5032 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5035 target_vdu
["interfaces"] = interfaces_wth_all_positions
5036 mock_prepare_vdu_cloud_init
.return_value
= {}
5037 mock_prepare_vdu_affinity_group_list
.return_value
= [
5042 new_kwargs
= deepcopy(kwargs
)
5047 "tasks_by_target_record_id": {},
5051 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
5052 expected_extra_dict3
["params"]["affinity_group_list"] = [
5056 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5057 db
.get_one
.return_value
= vnfd
5058 result
= Ns
._process
_vdu
_params
(
5059 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5061 self
.assertDictEqual(result
, expected_extra_dict3
)
5062 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5063 mock_locate_vdu_interfaces
.assert_not_called()
5064 mock_prepare_vdu_cloud_init
.assert_called_once()
5065 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5066 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5067 mock_prepare_vdu_interfaces
.assert_called_once_with(
5069 expected_extra_dict3
,
5077 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5078 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5079 mock_find_persistent_volumes
.assert_not_called()
5081 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5082 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5083 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5084 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5085 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5086 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5087 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5088 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5089 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5090 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5091 def test_process_vdu_params_wth_cloud_config(
5093 mock_prepare_vdu_affinity_group_list
,
5094 mock_add_persistent_ordinary_disks_to_disk_list
,
5095 mock_add_persistent_root_disk_to_disk_list
,
5096 mock_find_persistent_volumes
,
5097 mock_find_persistent_root_volumes
,
5098 mock_prepare_vdu_ssh_keys
,
5099 mock_prepare_vdu_cloud_init
,
5100 mock_prepare_vdu_interfaces
,
5101 mock_locate_vdu_interfaces
,
5102 mock_sort_vdu_interfaces
,
5104 """There is cloud-config."""
5105 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5108 target_vdu
["interfaces"] = interfaces_wth_all_positions
5109 mock_prepare_vdu_cloud_init
.return_value
= {
5110 "user-data": user_data
,
5111 "boot-data-drive": "vda",
5113 mock_prepare_vdu_affinity_group_list
.return_value
= []
5115 new_kwargs
= deepcopy(kwargs
)
5120 "tasks_by_target_record_id": {},
5124 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
5125 expected_extra_dict3
["params"]["cloud_config"] = {
5126 "user-data": user_data
,
5127 "boot-data-drive": "vda",
5129 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5130 db
.get_one
.return_value
= vnfd
5131 result
= Ns
._process
_vdu
_params
(
5132 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5134 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5135 mock_locate_vdu_interfaces
.assert_not_called()
5136 mock_prepare_vdu_cloud_init
.assert_called_once()
5137 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5138 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5139 mock_prepare_vdu_interfaces
.assert_called_once_with(
5141 expected_extra_dict3
,
5148 self
.assertDictEqual(result
, expected_extra_dict3
)
5149 mock_prepare_vdu_ssh_keys
.assert_called_once_with(
5150 target_vdu
, None, {"user-data": user_data
, "boot-data-drive": "vda"}
5152 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5153 mock_find_persistent_volumes
.assert_not_called()
5155 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5156 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5157 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5158 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5159 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5160 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5161 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5162 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5163 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5164 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5165 def test_process_vdu_params_wthout_persistent_storage(
5167 mock_prepare_vdu_affinity_group_list
,
5168 mock_add_persistent_ordinary_disks_to_disk_list
,
5169 mock_add_persistent_root_disk_to_disk_list
,
5170 mock_find_persistent_volumes
,
5171 mock_find_persistent_root_volumes
,
5172 mock_prepare_vdu_ssh_keys
,
5173 mock_prepare_vdu_cloud_init
,
5174 mock_prepare_vdu_interfaces
,
5175 mock_locate_vdu_interfaces
,
5176 mock_sort_vdu_interfaces
,
5178 """There is not any persistent storage."""
5179 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5182 target_vdu
["interfaces"] = interfaces_wth_all_positions
5183 mock_prepare_vdu_cloud_init
.return_value
= {}
5184 mock_prepare_vdu_affinity_group_list
.return_value
= []
5186 new_kwargs
= deepcopy(kwargs
)
5191 "tasks_by_target_record_id": {},
5195 expected_extra_dict_copy
= deepcopy(expected_extra_dict2
)
5196 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
5197 db
.get_one
.return_value
= vnfd
5198 result
= Ns
._process
_vdu
_params
(
5199 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5201 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5202 mock_locate_vdu_interfaces
.assert_not_called()
5203 mock_prepare_vdu_cloud_init
.assert_called_once()
5204 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5205 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5206 mock_prepare_vdu_interfaces
.assert_called_once_with(
5208 expected_extra_dict_copy
,
5215 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5216 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5217 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5218 mock_find_persistent_volumes
.assert_not_called()
5220 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5221 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5222 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5223 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5224 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5225 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5226 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5227 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5228 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5229 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5230 def test_process_vdu_params_interfaces_partially_located(
5232 mock_prepare_vdu_affinity_group_list
,
5233 mock_add_persistent_ordinary_disks_to_disk_list
,
5234 mock_add_persistent_root_disk_to_disk_list
,
5235 mock_find_persistent_volumes
,
5236 mock_find_persistent_root_volumes
,
5237 mock_prepare_vdu_ssh_keys
,
5238 mock_prepare_vdu_cloud_init
,
5239 mock_prepare_vdu_interfaces
,
5240 mock_locate_vdu_interfaces
,
5241 mock_sort_vdu_interfaces
,
5243 """Some interfaces have position."""
5244 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5247 target_vdu
["interfaces"] = [
5250 "ns-vld-id": "net1",
5252 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
5255 "ns-vld-id": "mgmtnet",
5258 mock_prepare_vdu_cloud_init
.return_value
= {}
5259 mock_prepare_vdu_affinity_group_list
.return_value
= []
5260 persistent_root_disk
= {
5261 "persistent-root-volume": {
5262 "image_id": "ubuntu20.04",
5267 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5269 new_kwargs
= deepcopy(kwargs
)
5274 "tasks_by_target_record_id": {},
5279 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5280 db
.get_one
.return_value
= vnfd
5281 result
= Ns
._process
_vdu
_params
(
5282 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5284 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5285 mock_sort_vdu_interfaces
.assert_not_called()
5286 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5287 mock_prepare_vdu_cloud_init
.assert_called_once()
5288 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5289 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5290 mock_prepare_vdu_interfaces
.assert_called_once_with(
5292 expected_extra_dict_copy
,
5299 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5300 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5301 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5302 mock_find_persistent_volumes
.assert_not_called()
5303 mock_find_persistent_root_volumes
.assert_not_called()
5305 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5306 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5307 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5308 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5309 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5310 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5311 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5312 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5313 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5314 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5315 def test_process_vdu_params_no_interface_position(
5317 mock_prepare_vdu_affinity_group_list
,
5318 mock_add_persistent_ordinary_disks_to_disk_list
,
5319 mock_add_persistent_root_disk_to_disk_list
,
5320 mock_find_persistent_volumes
,
5321 mock_find_persistent_root_volumes
,
5322 mock_prepare_vdu_ssh_keys
,
5323 mock_prepare_vdu_cloud_init
,
5324 mock_prepare_vdu_interfaces
,
5325 mock_locate_vdu_interfaces
,
5326 mock_sort_vdu_interfaces
,
5328 """Interfaces do not have position."""
5329 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5332 target_vdu
["interfaces"] = interfaces_wthout_positions
5333 mock_prepare_vdu_cloud_init
.return_value
= {}
5334 mock_prepare_vdu_affinity_group_list
.return_value
= []
5335 persistent_root_disk
= {
5336 "persistent-root-volume": {
5337 "image_id": "ubuntu20.04",
5342 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5343 new_kwargs
= deepcopy(kwargs
)
5348 "tasks_by_target_record_id": {},
5353 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5354 db
.get_one
.return_value
= vnfd
5355 result
= Ns
._process
_vdu
_params
(
5356 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5358 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5359 mock_sort_vdu_interfaces
.assert_not_called()
5360 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5361 mock_prepare_vdu_cloud_init
.assert_called_once()
5362 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5363 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5364 mock_prepare_vdu_interfaces
.assert_called_once_with(
5366 expected_extra_dict_copy
,
5373 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5374 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5375 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5376 mock_find_persistent_volumes
.assert_not_called()
5377 mock_find_persistent_root_volumes
.assert_not_called()
5379 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5380 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5381 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5382 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5383 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5384 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5385 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5386 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5387 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5388 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5389 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5391 mock_prepare_vdu_affinity_group_list
,
5392 mock_add_persistent_ordinary_disks_to_disk_list
,
5393 mock_add_persistent_root_disk_to_disk_list
,
5394 mock_find_persistent_volumes
,
5395 mock_find_persistent_root_volumes
,
5396 mock_prepare_vdu_ssh_keys
,
5397 mock_prepare_vdu_cloud_init
,
5398 mock_prepare_vdu_interfaces
,
5399 mock_locate_vdu_interfaces
,
5400 mock_sort_vdu_interfaces
,
5402 """Prepare vdu interfaces method raises exception."""
5403 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5406 target_vdu
["interfaces"] = interfaces_wthout_positions
5407 mock_prepare_vdu_cloud_init
.return_value
= {}
5408 mock_prepare_vdu_affinity_group_list
.return_value
= []
5409 persistent_root_disk
= {
5410 "persistent-root-volume": {
5411 "image_id": "ubuntu20.04",
5416 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5417 new_kwargs
= deepcopy(kwargs
)
5422 "tasks_by_target_record_id": {},
5426 mock_prepare_vdu_interfaces
.side_effect
= TypeError
5428 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5429 db
.get_one
.return_value
= vnfd
5430 with self
.assertRaises(Exception) as err
:
5431 Ns
._process
_vdu
_params
(
5432 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5434 self
.assertEqual(type(err
), TypeError)
5435 mock_sort_vdu_interfaces
.assert_not_called()
5436 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5437 mock_prepare_vdu_cloud_init
.assert_not_called()
5438 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
5439 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5440 mock_prepare_vdu_interfaces
.assert_called_once()
5441 mock_prepare_vdu_ssh_keys
.assert_not_called()
5442 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5443 mock_find_persistent_volumes
.assert_not_called()
5444 mock_find_persistent_root_volumes
.assert_not_called()
5446 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5447 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5448 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5449 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5450 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5451 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5452 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5453 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5454 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5455 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5456 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5458 mock_prepare_vdu_affinity_group_list
,
5459 mock_add_persistent_ordinary_disks_to_disk_list
,
5460 mock_add_persistent_root_disk_to_disk_list
,
5461 mock_find_persistent_volumes
,
5462 mock_find_persistent_root_volumes
,
5463 mock_prepare_vdu_ssh_keys
,
5464 mock_prepare_vdu_cloud_init
,
5465 mock_prepare_vdu_interfaces
,
5466 mock_locate_vdu_interfaces
,
5467 mock_sort_vdu_interfaces
,
5469 """Add persistent root disk method raises exception."""
5470 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5473 target_vdu
["interfaces"] = interfaces_wthout_positions
5474 mock_prepare_vdu_cloud_init
.return_value
= {}
5475 mock_prepare_vdu_affinity_group_list
.return_value
= []
5476 mock_add_persistent_root_disk_to_disk_list
.side_effect
= KeyError
5477 new_kwargs
= deepcopy(kwargs
)
5482 "tasks_by_target_record_id": {},
5487 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5488 db
.get_one
.return_value
= vnfd
5489 with self
.assertRaises(Exception) as err
:
5490 Ns
._process
_vdu
_params
(
5491 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5493 self
.assertEqual(type(err
), KeyError)
5494 mock_sort_vdu_interfaces
.assert_not_called()
5495 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5496 mock_prepare_vdu_cloud_init
.assert_called_once()
5497 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5498 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5499 mock_prepare_vdu_interfaces
.assert_called_once_with(
5503 f
"{ns_preffix}:image.0",
5504 f
"{ns_preffix}:flavor.0",
5514 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5515 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5516 mock_find_persistent_volumes
.assert_not_called()
5517 mock_find_persistent_root_volumes
.assert_not_called()
5519 def test_select_persistent_root_disk(self
):
5520 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5521 vdu
["virtual-storage-desc"] = [
5522 "persistent-root-volume",
5523 "persistent-volume2",
5526 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5527 expected_result
= vsd
5528 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5529 self
.assertEqual(result
, expected_result
)
5531 def test_select_persistent_root_disk_first_vsd_is_different(self
):
5532 """VDU first virtual-storage-desc is different than vsd id."""
5533 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5534 vdu
["virtual-storage-desc"] = [
5535 "persistent-volume2",
5536 "persistent-root-volume",
5539 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5540 expected_result
= None
5541 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5542 self
.assertEqual(result
, expected_result
)
5544 def test_select_persistent_root_disk_vsd_is_not_persistent(self
):
5545 """vsd type is not persistent."""
5546 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5547 vdu
["virtual-storage-desc"] = [
5548 "persistent-volume2",
5549 "persistent-root-volume",
5552 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5553 vsd
["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5554 expected_result
= None
5555 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5556 self
.assertEqual(result
, expected_result
)
5558 def test_select_persistent_root_disk_vsd_does_not_have_size(self
):
5559 """vsd size is None."""
5560 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5561 vdu
["virtual-storage-desc"] = [
5562 "persistent-volume2",
5563 "persistent-root-volume",
5566 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5567 vsd
["size-of-storage"] = None
5568 expected_result
= None
5569 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5570 self
.assertEqual(result
, expected_result
)
5572 def test_select_persistent_root_disk_vdu_wthout_vsd(self
):
5573 """VDU does not have virtual-storage-desc."""
5574 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5575 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5576 expected_result
= None
5577 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5578 self
.assertEqual(result
, expected_result
)
5580 def test_select_persistent_root_disk_invalid_vsd_type(self
):
5581 """vsd is list, expected to be a dict."""
5582 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5583 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"]
5584 with self
.assertRaises(AttributeError):
5585 Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)