1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy
import deepcopy
19 from unittest
.mock
import MagicMock
, Mock
, patch
29 from osm_ng_ro
.ns
import Ns
, NsException
32 __author__
= "Eduardo Sousa"
33 __date__
= "$19-NOV-2021 00:00:00$"
36 # Variables used in Tests
37 vnfd_wth_persistent_storage
= {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
56 "virtual-storage-desc": [
57 "persistent-root-volume",
64 "virtual-storage-desc": [
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
74 "vdu-storage-requirements": [
75 {"key": "keep-volume", "value": "true"},
79 "id": "ephemeral-volume",
80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
81 "size-of-storage": "1",
87 "path": "/app/storage/",
92 vim_volume_id
= "ru937f49-3870-4169-b758-9732e1ff40f3"
93 task_by_target_record_id
= {
94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
95 "extra_dict": {"params": {"net_type": "SR-IOV"}}
98 interfaces_wthout_positions
= [
109 "ns-vld-id": "mgmtnet",
112 interfaces_wth_all_positions
= [
125 "ns-vld-id": "mgmtnet",
129 target_vdu_wth_persistent_storage
= {
130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
133 "vdu-name": "several_volumes-VM",
137 "ns-vld-id": "mgmtnet",
140 "virtual-storages": [
142 "id": "persistent-volume2",
143 "size-of-storage": "10",
144 "type-of-storage": "persistent-storage:persistent-storage",
147 "id": "persistent-root-volume",
148 "size-of-storage": "10",
149 "type-of-storage": "persistent-storage:persistent-storage",
150 "vdu-storage-requirements": [
151 {"key": "keep-volume", "value": "true"},
155 "id": "ephemeral-volume",
156 "size-of-storage": "1",
157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
161 db
= MagicMock(name
="database mock")
162 fs
= MagicMock(name
="database mock")
163 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
164 vnf_preffix
= "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
165 vnfr_id
= "wh47f48-y870-4169-b758-5732e1ff40f5"
166 nsr_id
= "th47f48-9870-4169-b758-9732e1ff40f3"
168 "name": "sample_name",
170 expected_extra_dict
= {
172 f
"{ns_preffix}:image.0",
173 f
"{ns_preffix}:flavor.0",
176 "affinity_group_list": [],
177 "availability_zone_index": None,
178 "availability_zone_list": None,
179 "cloud_config": None,
180 "description": "several_volumes-VM",
182 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
183 "image_id": f
"TASK-{ns_preffix}:image.0",
184 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
190 expected_extra_dict2
= {
192 f
"{ns_preffix}:image.0",
193 f
"{ns_preffix}:flavor.0",
196 "affinity_group_list": [],
197 "availability_zone_index": None,
198 "availability_zone_list": None,
199 "cloud_config": None,
200 "description": "without_volumes-VM",
202 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
203 "image_id": f
"TASK-{ns_preffix}:image.0",
204 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
210 expected_extra_dict3
= {
212 f
"{ns_preffix}:image.0",
215 "affinity_group_list": [],
216 "availability_zone_index": None,
217 "availability_zone_list": None,
218 "cloud_config": None,
219 "description": "without_volumes-VM",
221 "flavor_id": "flavor_test",
222 "image_id": f
"TASK-{ns_preffix}:image.0",
223 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
228 tasks_by_target_record_id
= {
229 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
232 "net_type": "SR-IOV",
239 "vdu2cloud_init": {},
241 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
242 "member-vnf-index-ref": "vnf-several-volumes",
245 vnfd_wthout_persistent_storage
= {
246 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
250 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
253 "id": "without_volumes-vnf",
254 "product-name": "without_volumes-vnf",
257 "id": "without_volumes-VM",
258 "name": "without_volumes-VM",
259 "sw-image-desc": "ubuntu20.04",
260 "alternative-sw-image-desc": [
264 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
268 "virtual-storage-desc": [
269 {"id": "root-volume", "size-of-storage": "10"},
271 "id": "ephemeral-volume",
272 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
273 "size-of-storage": "1",
279 "path": "/app/storage/",
285 target_vdu_wthout_persistent_storage
= {
286 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
289 "vdu-name": "without_volumes-VM",
293 "ns-vld-id": "mgmtnet",
296 "virtual-storages": [
299 "size-of-storage": "10",
302 "id": "ephemeral-volume",
303 "size-of-storage": "1",
304 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
308 cloud_init_content
= """
313 overwrite: {{is_override}}
316 - [ sh, -xc, "echo $(date) '{{command}}'" ]
327 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
329 vdu_id
= "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
330 vnf_id
= "665b4165-ce24-4320-bf19-b9a45bade49f"
331 target_vim
= "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
332 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
333 nsr_id_2
= "993166fe-723e-4680-ac4b-b1af2541ae31"
334 target_record_1
= "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
336 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
337 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
339 expected_result_vertical_scale
= {
340 "target_id": target_vim
,
341 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
342 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
343 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
344 "status": "SCHEDULED",
346 "item": "verticalscale",
347 "target_record": target_record_1
,
348 "target_record_id": target_record_id
,
350 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
351 "flavor_dict": "flavor_dict",
356 "vim_info": {target_vim
: {"interfaces": []}},
358 vnf
= {"_id": vnf_id
}
359 extra_dict_vertical_scale
= {
361 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
362 "flavor_dict": "flavor_dict",
365 extra_dict_migrate
= {
367 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
368 "migrate_host": "migrateToHost",
371 expected_result_migrate
= {
372 "target_id": target_vim
,
373 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
374 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
375 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
376 "status": "SCHEDULED",
379 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
380 "target_record_id": target_record_id
,
382 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
383 "migrate_host": "migrateToHost",
386 expected_result_rebuild_start_stop
= {
387 "target_id": target_vim
,
388 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
389 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
390 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
391 "status": "SCHEDULED",
394 "target_record_id": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
398 class TestException(Exception):
402 class CopyingMock(MagicMock
):
403 def __call__(self
, *args
, **kwargs
):
404 args
= deepcopy(args
)
405 kwargs
= deepcopy(kwargs
)
406 return super(CopyingMock
, self
).__call
__(*args
, **kwargs
)
409 class TestNs(unittest
.TestCase
):
413 def test__create_task_without_extra_dict(self
):
415 "target_id": "vim_openstack_1",
416 "action_id": "123456",
418 "task_id": "123456:1",
419 "status": "SCHEDULED",
422 "target_record": "test_target_record",
423 "target_record_id": "test_target_record_id",
426 "action_id": "123456",
431 task
= Ns
._create
_task
(
432 deployment_info
=deployment_info
,
433 target_id
="vim_openstack_1",
436 target_record
="test_target_record",
437 target_record_id
="test_target_record_id",
440 self
.assertEqual(deployment_info
.get("task_index"), 2)
441 self
.assertDictEqual(task
, expected_result
)
443 def test__create_task(self
):
445 "target_id": "vim_openstack_1",
446 "action_id": "123456",
448 "task_id": "123456:1",
449 "status": "SCHEDULED",
452 "target_record": "test_target_record",
453 "target_record_id": "test_target_record_id",
454 # values coming from extra_dict
455 "params": "test_params",
456 "find_params": "test_find_params",
457 "depends_on": "test_depends_on",
460 "action_id": "123456",
465 task
= Ns
._create
_task
(
466 deployment_info
=deployment_info
,
467 target_id
="vim_openstack_1",
470 target_record
="test_target_record",
471 target_record_id
="test_target_record_id",
473 "params": "test_params",
474 "find_params": "test_find_params",
475 "depends_on": "test_depends_on",
479 self
.assertEqual(deployment_info
.get("task_index"), 2)
480 self
.assertDictEqual(task
, expected_result
)
482 @patch("osm_ng_ro.ns.time")
483 def test__create_ro_task(self
, mock_time
: Mock
):
484 now
= 1637324838.994551
485 mock_time
.return_value
= now
487 "target_id": "vim_openstack_1",
488 "action_id": "123456",
490 "task_id": "123456:1",
491 "status": "SCHEDULED",
494 "target_record": "test_target_record",
495 "target_record_id": "test_target_record_id",
496 # values coming from extra_dict
497 "params": "test_params",
498 "find_params": "test_find_params",
499 "depends_on": "test_depends_on",
505 "target_id": "vim_openstack_1",
508 "created_items": None,
522 ro_task
= Ns
._create
_ro
_task
(
523 target_id
="vim_openstack_1",
527 self
.assertDictEqual(ro_task
, expected_result
)
529 def test__process_image_params_with_empty_target_image(self
):
535 result
= Ns
._process
_image
_params
(
536 target_image
=target_image
,
539 target_record_id
=None,
542 self
.assertDictEqual(expected_result
, result
)
544 def test__process_image_params_with_wrong_target_image(self
):
549 "no_image": "to_see_here",
552 result
= Ns
._process
_image
_params
(
553 target_image
=target_image
,
556 target_record_id
=None,
559 self
.assertDictEqual(expected_result
, result
)
561 def test__process_image_params_with_image(self
):
573 result
= Ns
._process
_image
_params
(
574 target_image
=target_image
,
577 target_record_id
=None,
580 self
.assertDictEqual(expected_result
, result
)
582 def test__process_image_params_with_vim_image_id(self
):
591 "vim_image_id": "123456",
594 result
= Ns
._process
_image
_params
(
595 target_image
=target_image
,
598 target_record_id
=None,
601 self
.assertDictEqual(expected_result
, result
)
603 def test__process_image_params_with_image_checksum(self
):
607 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
612 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
615 result
= Ns
._process
_image
_params
(
616 target_image
=target_image
,
619 target_record_id
=None,
622 self
.assertDictEqual(expected_result
, result
)
624 def test__get_resource_allocation_params_with_empty_target_image(self
):
626 quota_descriptor
= {}
628 result
= Ns
._get
_resource
_allocation
_params
(
629 quota_descriptor
=quota_descriptor
,
632 self
.assertDictEqual(expected_result
, result
)
634 def test__get_resource_allocation_params_with_wrong_target_image(self
):
637 "no_quota": "present_here",
640 result
= Ns
._get
_resource
_allocation
_params
(
641 quota_descriptor
=quota_descriptor
,
644 self
.assertDictEqual(expected_result
, result
)
646 def test__get_resource_allocation_params_with_limit(self
):
654 result
= Ns
._get
_resource
_allocation
_params
(
655 quota_descriptor
=quota_descriptor
,
658 self
.assertDictEqual(expected_result
, result
)
660 def test__get_resource_allocation_params_with_reserve(self
):
668 result
= Ns
._get
_resource
_allocation
_params
(
669 quota_descriptor
=quota_descriptor
,
672 self
.assertDictEqual(expected_result
, result
)
674 def test__get_resource_allocation_params_with_shares(self
):
682 result
= Ns
._get
_resource
_allocation
_params
(
683 quota_descriptor
=quota_descriptor
,
686 self
.assertDictEqual(expected_result
, result
)
688 def test__get_resource_allocation_params(self
):
700 result
= Ns
._get
_resource
_allocation
_params
(
701 quota_descriptor
=quota_descriptor
,
704 self
.assertDictEqual(expected_result
, result
)
706 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
707 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
715 result
= Ns
._process
_guest
_epa
_quota
_params
(
716 guest_epa_quota
=guest_epa_quota
,
717 epa_vcpu_set
=epa_vcpu_set
,
720 self
.assertDictEqual(expected_result
, result
)
721 self
.assertFalse(resource_allocation
.called
)
723 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
724 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
732 result
= Ns
._process
_guest
_epa
_quota
_params
(
733 guest_epa_quota
=guest_epa_quota
,
734 epa_vcpu_set
=epa_vcpu_set
,
737 self
.assertDictEqual(expected_result
, result
)
738 self
.assertFalse(resource_allocation
.called
)
740 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
741 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
747 "no-quota": "nothing",
751 result
= Ns
._process
_guest
_epa
_quota
_params
(
752 guest_epa_quota
=guest_epa_quota
,
753 epa_vcpu_set
=epa_vcpu_set
,
756 self
.assertDictEqual(expected_result
, result
)
757 self
.assertFalse(resource_allocation
.called
)
759 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
760 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
766 "no-quota": "nothing",
770 result
= Ns
._process
_guest
_epa
_quota
_params
(
771 guest_epa_quota
=guest_epa_quota
,
772 epa_vcpu_set
=epa_vcpu_set
,
775 self
.assertDictEqual(expected_result
, result
)
776 self
.assertFalse(resource_allocation
.called
)
778 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
779 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
793 result
= Ns
._process
_guest
_epa
_quota
_params
(
794 guest_epa_quota
=guest_epa_quota
,
795 epa_vcpu_set
=epa_vcpu_set
,
798 self
.assertDictEqual(expected_result
, result
)
799 self
.assertFalse(resource_allocation
.called
)
801 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
802 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
822 resource_allocation_param
= {
827 resource_allocation
.return_value
= {
833 result
= Ns
._process
_guest
_epa
_quota
_params
(
834 guest_epa_quota
=guest_epa_quota
,
835 epa_vcpu_set
=epa_vcpu_set
,
838 resource_allocation
.assert_called_once_with(resource_allocation_param
)
839 self
.assertDictEqual(expected_result
, result
)
841 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
842 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
862 resource_allocation_param
= {
867 resource_allocation
.return_value
= {
873 result
= Ns
._process
_guest
_epa
_quota
_params
(
874 guest_epa_quota
=guest_epa_quota
,
875 epa_vcpu_set
=epa_vcpu_set
,
878 resource_allocation
.assert_called_once_with(resource_allocation_param
)
879 self
.assertDictEqual(expected_result
, result
)
881 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
882 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
902 resource_allocation_param
= {
907 resource_allocation
.return_value
= {
913 result
= Ns
._process
_guest
_epa
_quota
_params
(
914 guest_epa_quota
=guest_epa_quota
,
915 epa_vcpu_set
=epa_vcpu_set
,
918 resource_allocation
.assert_called_once_with(resource_allocation_param
)
919 self
.assertDictEqual(expected_result
, result
)
921 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
922 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
942 resource_allocation_param
= {
947 resource_allocation
.return_value
= {
953 result
= Ns
._process
_guest
_epa
_quota
_params
(
954 guest_epa_quota
=guest_epa_quota
,
955 epa_vcpu_set
=epa_vcpu_set
,
958 resource_allocation
.assert_called_once_with(resource_allocation_param
)
959 self
.assertDictEqual(expected_result
, result
)
961 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
962 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
982 resource_allocation_param
= {
987 resource_allocation
.return_value
= {
993 result
= Ns
._process
_guest
_epa
_quota
_params
(
994 guest_epa_quota
=guest_epa_quota
,
995 epa_vcpu_set
=epa_vcpu_set
,
998 resource_allocation
.assert_called_once_with(resource_allocation_param
)
999 self
.assertDictEqual(expected_result
, result
)
1001 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1002 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
1004 resource_allocation
,
1022 resource_allocation_param
= {
1027 resource_allocation
.return_value
= {
1033 result
= Ns
._process
_guest
_epa
_quota
_params
(
1034 guest_epa_quota
=guest_epa_quota
,
1035 epa_vcpu_set
=epa_vcpu_set
,
1038 resource_allocation
.assert_called_once_with(resource_allocation_param
)
1039 self
.assertDictEqual(expected_result
, result
)
1041 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1042 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
1044 resource_allocation
,
1060 epa_vcpu_set
= False
1062 resource_allocation_param
= {
1067 resource_allocation
.return_value
= {
1073 result
= Ns
._process
_guest
_epa
_quota
_params
(
1074 guest_epa_quota
=guest_epa_quota
,
1075 epa_vcpu_set
=epa_vcpu_set
,
1078 resource_allocation
.assert_called_once_with(resource_allocation_param
)
1079 self
.assertDictEqual(expected_result
, result
)
1081 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1082 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
1084 resource_allocation
,
1127 resource_allocation
.return_value
= {
1133 result
= Ns
._process
_guest
_epa
_quota
_params
(
1134 guest_epa_quota
=guest_epa_quota
,
1135 epa_vcpu_set
=epa_vcpu_set
,
1138 self
.assertTrue(resource_allocation
.called
)
1139 self
.assertDictEqual(expected_result
, result
)
1141 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1142 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1144 resource_allocation
,
1190 epa_vcpu_set
= False
1192 resource_allocation
.return_value
= {
1198 result
= Ns
._process
_guest
_epa
_quota
_params
(
1199 guest_epa_quota
=guest_epa_quota
,
1200 epa_vcpu_set
=epa_vcpu_set
,
1203 self
.assertTrue(resource_allocation
.called
)
1204 self
.assertDictEqual(expected_result
, result
)
1206 def test__process_guest_epa_numa_params_with_empty_numa_params(self
):
1207 expected_numa_result
= []
1208 expected_epa_vcpu_set_result
= False
1209 guest_epa_quota
= {}
1211 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1212 guest_epa_quota
=guest_epa_quota
,
1214 self
.assertEqual(expected_numa_result
, numa_result
)
1215 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1217 def test__process_guest_epa_numa_params_with_wrong_numa_params(self
):
1218 expected_numa_result
= []
1219 expected_epa_vcpu_set_result
= False
1220 guest_epa_quota
= {"no_nume": "here"}
1222 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1223 guest_epa_quota
=guest_epa_quota
,
1226 self
.assertEqual(expected_numa_result
, numa_result
)
1227 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1229 def test__process_guest_epa_numa_params_with_numa_node_policy(self
):
1230 expected_numa_result
= []
1231 expected_epa_vcpu_set_result
= False
1232 guest_epa_quota
= {"numa-node-policy": {}}
1234 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1235 guest_epa_quota
=guest_epa_quota
,
1238 self
.assertEqual(expected_numa_result
, numa_result
)
1239 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1241 def test__process_guest_epa_numa_params_with_no_node(self
):
1242 expected_numa_result
= []
1243 expected_epa_vcpu_set_result
= False
1245 "numa-node-policy": {
1250 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1251 guest_epa_quota
=guest_epa_quota
,
1254 self
.assertEqual(expected_numa_result
, numa_result
)
1255 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1257 def test__process_guest_epa_numa_params_with_1_node_num_cores(self
):
1258 expected_numa_result
= [{"cores": 3}]
1259 expected_epa_vcpu_set_result
= True
1261 "numa-node-policy": {
1270 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1271 guest_epa_quota
=guest_epa_quota
,
1274 self
.assertEqual(expected_numa_result
, numa_result
)
1275 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1277 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self
):
1278 expected_numa_result
= [{"paired_threads": 3}]
1279 expected_epa_vcpu_set_result
= True
1281 "numa-node-policy": {
1284 "paired-threads": {"num-paired-threads": "3"},
1290 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1291 guest_epa_quota
=guest_epa_quota
,
1294 self
.assertEqual(expected_numa_result
, numa_result
)
1295 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1297 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self
):
1298 expected_numa_result
= [
1300 "paired-threads-id": [("0", "1"), ("4", "5")],
1303 expected_epa_vcpu_set_result
= False
1305 "numa-node-policy": {
1309 "paired-thread-ids": [
1325 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1326 guest_epa_quota
=guest_epa_quota
,
1329 self
.assertEqual(expected_numa_result
, numa_result
)
1330 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1332 def test__process_guest_epa_numa_params_with_1_node_num_threads(self
):
1333 expected_numa_result
= [{"threads": 3}]
1334 expected_epa_vcpu_set_result
= True
1336 "numa-node-policy": {
1345 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1346 guest_epa_quota
=guest_epa_quota
,
1349 self
.assertEqual(expected_numa_result
, numa_result
)
1350 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1352 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self
):
1353 expected_numa_result
= [{"memory": 2}]
1354 expected_epa_vcpu_set_result
= False
1356 "numa-node-policy": {
1365 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1366 guest_epa_quota
=guest_epa_quota
,
1369 self
.assertEqual(expected_numa_result
, numa_result
)
1370 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1372 def test__process_guest_epa_numa_params_with_1_node_vcpu(self
):
1373 expected_numa_result
= [
1379 expected_epa_vcpu_set_result
= False
1381 "numa-node-policy": {
1382 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1386 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1387 guest_epa_quota
=guest_epa_quota
,
1390 self
.assertEqual(expected_numa_result
, numa_result
)
1391 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1393 def test__process_guest_epa_numa_params_with_2_node_vcpu(self
):
1394 expected_numa_result
= [
1405 expected_epa_vcpu_set_result
= False
1407 "numa-node-policy": {
1409 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1410 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1415 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1416 guest_epa_quota
=guest_epa_quota
,
1419 self
.assertEqual(expected_numa_result
, numa_result
)
1420 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1422 def test__process_guest_epa_numa_params_with_1_node(self
):
1423 expected_numa_result
= [
1428 "paired_threads": 3,
1429 "paired-threads-id": [("0", "1"), ("4", "5")],
1434 expected_epa_vcpu_set_result
= True
1436 "numa-node-policy": {
1441 "num-paired-threads": "3",
1442 "paired-thread-ids": [
1460 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1461 guest_epa_quota
=guest_epa_quota
,
1464 self
.assertEqual(expected_numa_result
, numa_result
)
1465 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1467 def test__process_guest_epa_numa_params_with_2_nodes(self
):
1468 expected_numa_result
= [
1471 "paired_threads": 3,
1472 "paired-threads-id": [("0", "1"), ("4", "5")],
1478 "paired_threads": 7,
1479 "paired-threads-id": [("2", "3"), ("5", "6")],
1484 expected_epa_vcpu_set_result
= True
1486 "numa-node-policy": {
1491 "num-paired-threads": "3",
1492 "paired-thread-ids": [
1509 "num-paired-threads": "7",
1510 "paired-thread-ids": [
1528 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1529 guest_epa_quota
=guest_epa_quota
,
1532 self
.assertEqual(expected_numa_result
, numa_result
)
1533 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1535 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self
):
1536 expected_numa_result
= {}
1537 expected_epa_vcpu_set_result
= False
1538 guest_epa_quota
= {}
1540 epa_vcpu_set
= False
1542 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1543 guest_epa_quota
=guest_epa_quota
,
1544 vcpu_count
=vcpu_count
,
1545 epa_vcpu_set
=epa_vcpu_set
,
1548 self
.assertDictEqual(expected_numa_result
, numa_result
)
1549 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1551 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self
):
1552 expected_numa_result
= {}
1553 expected_epa_vcpu_set_result
= False
1555 "no-cpu-pinning-policy": "here",
1558 epa_vcpu_set
= False
1560 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1561 guest_epa_quota
=guest_epa_quota
,
1562 vcpu_count
=vcpu_count
,
1563 epa_vcpu_set
=epa_vcpu_set
,
1566 self
.assertDictEqual(expected_numa_result
, numa_result
)
1567 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1569 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self
):
1570 expected_numa_result
= {}
1571 expected_epa_vcpu_set_result
= True
1573 "cpu-pinning-policy": "DEDICATED",
1578 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1579 guest_epa_quota
=guest_epa_quota
,
1580 vcpu_count
=vcpu_count
,
1581 epa_vcpu_set
=epa_vcpu_set
,
1584 self
.assertDictEqual(expected_numa_result
, numa_result
)
1585 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1587 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self
):
1588 expected_numa_result
= {"threads": 3}
1589 expected_epa_vcpu_set_result
= True
1591 "cpu-pinning-policy": "DEDICATED",
1592 "cpu-thread-pinning-policy": "PREFER",
1595 epa_vcpu_set
= False
1597 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1598 guest_epa_quota
=guest_epa_quota
,
1599 vcpu_count
=vcpu_count
,
1600 epa_vcpu_set
=epa_vcpu_set
,
1603 self
.assertDictEqual(expected_numa_result
, numa_result
)
1604 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1606 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self
):
1607 expected_numa_result
= {"cores": 3}
1608 expected_epa_vcpu_set_result
= True
1610 "cpu-pinning-policy": "DEDICATED",
1611 "cpu-thread-pinning-policy": "ISOLATE",
1614 epa_vcpu_set
= False
1616 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1617 guest_epa_quota
=guest_epa_quota
,
1618 vcpu_count
=vcpu_count
,
1619 epa_vcpu_set
=epa_vcpu_set
,
1622 self
.assertDictEqual(expected_numa_result
, numa_result
)
1623 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1625 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self
):
1626 expected_numa_result
= {"threads": 3}
1627 expected_epa_vcpu_set_result
= True
1629 "cpu-pinning-policy": "DEDICATED",
1630 "cpu-thread-pinning-policy": "REQUIRE",
1633 epa_vcpu_set
= False
1635 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1636 guest_epa_quota
=guest_epa_quota
,
1637 vcpu_count
=vcpu_count
,
1638 epa_vcpu_set
=epa_vcpu_set
,
1641 self
.assertDictEqual(expected_numa_result
, numa_result
)
1642 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1644 def test__process_guest_epa_cpu_pinning_params(self
):
1645 expected_numa_result
= {"threads": 3}
1646 expected_epa_vcpu_set_result
= True
1648 "cpu-pinning-policy": "DEDICATED",
1651 epa_vcpu_set
= False
1653 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1654 guest_epa_quota
=guest_epa_quota
,
1655 vcpu_count
=vcpu_count
,
1656 epa_vcpu_set
=epa_vcpu_set
,
1659 self
.assertDictEqual(expected_numa_result
, numa_result
)
1660 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1662 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1663 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1664 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1665 def test__process_guest_epa_params_with_empty_params(
1667 guest_epa_numa_params
,
1668 guest_epa_cpu_pinning_params
,
1669 guest_epa_quota_params
,
1671 expected_result
= {}
1674 result
= Ns
._process
_epa
_params
(
1675 target_flavor
=target_flavor
,
1678 self
.assertDictEqual(expected_result
, result
)
1679 self
.assertFalse(guest_epa_numa_params
.called
)
1680 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1681 self
.assertFalse(guest_epa_quota_params
.called
)
1683 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1684 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1685 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1686 def test__process_guest_epa_params_with_wrong_params(
1688 guest_epa_numa_params
,
1689 guest_epa_cpu_pinning_params
,
1690 guest_epa_quota_params
,
1692 expected_result
= {}
1694 "no-guest-epa": "here",
1697 result
= Ns
._process
_epa
_params
(
1698 target_flavor
=target_flavor
,
1701 self
.assertDictEqual(expected_result
, result
)
1702 self
.assertFalse(guest_epa_numa_params
.called
)
1703 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1704 self
.assertFalse(guest_epa_quota_params
.called
)
1706 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1707 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1708 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1709 def test__process_guest_epa_params(
1711 guest_epa_numa_params
,
1712 guest_epa_cpu_pinning_params
,
1713 guest_epa_quota_params
,
1716 "mem-policy": "STRICT",
1721 "numa-node-policy": {
1722 "mem-policy": "STRICT",
1727 guest_epa_numa_params
.return_value
= ({}, False)
1728 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1729 guest_epa_quota_params
.return_value
= {}
1731 result
= Ns
._process
_epa
_params
(
1732 target_flavor
=target_flavor
,
1735 self
.assertDictEqual(expected_result
, result
)
1736 self
.assertTrue(guest_epa_numa_params
.called
)
1737 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1738 self
.assertTrue(guest_epa_quota_params
.called
)
1740 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1741 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1742 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1743 def test__process_guest_epa_params_with_mempage_size(
1745 guest_epa_numa_params
,
1746 guest_epa_cpu_pinning_params
,
1747 guest_epa_quota_params
,
1750 "mempage-size": "1G",
1751 "mem-policy": "STRICT",
1756 "mempage-size": "1G",
1757 "numa-node-policy": {
1758 "mem-policy": "STRICT",
1763 guest_epa_numa_params
.return_value
= ({}, False)
1764 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1765 guest_epa_quota_params
.return_value
= {}
1767 result
= Ns
._process
_epa
_params
(
1768 target_flavor
=target_flavor
,
1771 self
.assertDictEqual(expected_result
, result
)
1772 self
.assertTrue(guest_epa_numa_params
.called
)
1773 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1774 self
.assertTrue(guest_epa_quota_params
.called
)
1776 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1777 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1778 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1779 def test__process_guest_epa_params_with_numa(
1781 guest_epa_numa_params
,
1782 guest_epa_cpu_pinning_params
,
1783 guest_epa_quota_params
,
1786 "mempage-size": "1G",
1787 "cpu-pinning-policy": "DEDICATED",
1788 "cpu-thread-pinning-policy": "PREFER",
1793 "paired-threads": 3,
1794 "paired-threads-id": [("0", "1"), ("4", "5")],
1798 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1799 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1800 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1801 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1806 "mempage-size": "1G",
1807 "cpu-pinning-policy": "DEDICATED",
1808 "cpu-thread-pinning-policy": "PREFER",
1809 "numa-node-policy": {
1814 "num-paired-threads": "3",
1815 "paired-thread-ids": [
1854 guest_epa_numa_params
.return_value
= (
1858 "paired-threads": 3,
1859 "paired-threads-id": [("0", "1"), ("4", "5")],
1866 guest_epa_cpu_pinning_params
.return_value
= (
1872 guest_epa_quota_params
.return_value
= {
1895 result
= Ns
._process
_epa
_params
(
1896 target_flavor
=target_flavor
,
1898 self
.assertEqual(expected_result
, result
)
1899 self
.assertTrue(guest_epa_numa_params
.called
)
1900 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1901 self
.assertTrue(guest_epa_quota_params
.called
)
1903 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1904 def test__process_flavor_params_with_empty_target_flavor(
1912 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1917 target_record_id
= ""
1919 with self
.assertRaises(KeyError):
1920 Ns
._process
_flavor
_params
(
1921 target_flavor
=target_flavor
,
1924 target_record_id
=target_record_id
,
1927 self
.assertFalse(epa_params
.called
)
1929 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1930 def test__process_flavor_params_with_wrong_target_flavor(
1935 "no-target-flavor": "here",
1939 target_record_id
= ""
1941 with self
.assertRaises(KeyError):
1942 Ns
._process
_flavor
_params
(
1943 target_flavor
=target_flavor
,
1946 target_record_id
=target_record_id
,
1949 self
.assertFalse(epa_params
.called
)
1951 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1952 def test__process_flavor_params_with_empty_indata(
1976 "memory-mb": "1024",
1981 target_record_id
= ""
1983 epa_params
.return_value
= {}
1985 result
= Ns
._process
_flavor
_params
(
1986 target_flavor
=target_flavor
,
1989 target_record_id
=target_record_id
,
1992 self
.assertTrue(epa_params
.called
)
1993 self
.assertDictEqual(result
, expected_result
)
1995 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1996 def test__process_flavor_params_with_wrong_indata(
2020 "memory-mb": "1024",
2027 target_record_id
= ""
2029 epa_params
.return_value
= {}
2031 result
= Ns
._process
_flavor
_params
(
2032 target_flavor
=target_flavor
,
2035 target_record_id
=target_record_id
,
2038 self
.assertTrue(epa_params
.called
)
2039 self
.assertDictEqual(result
, expected_result
)
2041 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2042 def test__process_flavor_params_with_ephemeral_disk(
2050 db
.get_one
.return_value
= {
2051 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2056 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2060 "id": "without_volumes-vnf",
2061 "product-name": "without_volumes-vnf",
2064 "id": "without_volumes-VM",
2065 "name": "without_volumes-VM",
2066 "sw-image-desc": "ubuntu20.04",
2067 "alternative-sw-image-desc": [
2069 "ubuntu20.04-azure",
2071 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2075 "virtual-storage-desc": [
2076 {"id": "root-volume", "size-of-storage": "10"},
2078 "id": "ephemeral-volume",
2079 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2080 "size-of-storage": "1",
2086 "path": "/app/storage/",
2114 "memory-mb": "1024",
2122 "ns-flavor-id": "test_id",
2123 "virtual-storages": [
2125 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2126 "size-of-storage": "10",
2131 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2136 target_record_id
= ""
2138 epa_params
.return_value
= {}
2140 result
= Ns
._process
_flavor
_params
(
2141 target_flavor
=target_flavor
,
2144 target_record_id
=target_record_id
,
2148 self
.assertTrue(epa_params
.called
)
2149 self
.assertDictEqual(result
, expected_result
)
2151 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2152 def test__process_flavor_params_with_swap_disk(
2179 "memory-mb": "1024",
2187 "ns-flavor-id": "test_id",
2188 "virtual-storages": [
2190 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2191 "size-of-storage": "20",
2196 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2201 target_record_id
= ""
2203 epa_params
.return_value
= {}
2205 result
= Ns
._process
_flavor
_params
(
2206 target_flavor
=target_flavor
,
2209 target_record_id
=target_record_id
,
2212 self
.assertTrue(epa_params
.called
)
2213 self
.assertDictEqual(result
, expected_result
)
2215 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2216 def test__process_flavor_params_with_persistent_root_disk(
2224 db
.get_one
.return_value
= {
2225 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2230 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2234 "id": "several_volumes-vnf",
2235 "product-name": "several_volumes-vnf",
2238 "id": "several_volumes-VM",
2239 "name": "several_volumes-VM",
2240 "sw-image-desc": "ubuntu20.04",
2241 "alternative-sw-image-desc": [
2243 "ubuntu20.04-azure",
2245 "virtual-storage-desc": [
2246 "persistent-root-volume",
2251 "virtual-storage-desc": [
2253 "id": "persistent-root-volume",
2254 "type-of-storage": "persistent-storage:persistent-storage",
2255 "size-of-storage": "10",
2261 "path": "/app/storage/",
2287 "memory-mb": "1024",
2295 "vdu-name": "several_volumes-VM",
2296 "ns-flavor-id": "test_id",
2297 "virtual-storages": [
2299 "type-of-storage": "persistent-storage:persistent-storage",
2300 "size-of-storage": "10",
2305 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2310 target_record_id
= ""
2312 epa_params
.return_value
= {}
2314 result
= Ns
._process
_flavor
_params
(
2315 target_flavor
=target_flavor
,
2318 target_record_id
=target_record_id
,
2322 self
.assertTrue(epa_params
.called
)
2323 self
.assertDictEqual(result
, expected_result
)
2325 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2326 def test__process_flavor_params_with_epa_params(
2337 "numa": "there-is-numa-here",
2348 "numa": "there-is-numa-here",
2357 "memory-mb": "1024",
2365 "ns-flavor-id": "test_id",
2368 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2373 target_record_id
= ""
2375 epa_params
.return_value
= {
2376 "numa": "there-is-numa-here",
2379 result
= Ns
._process
_flavor
_params
(
2380 target_flavor
=target_flavor
,
2383 target_record_id
=target_record_id
,
2386 self
.assertTrue(epa_params
.called
)
2387 self
.assertDictEqual(result
, expected_result
)
2389 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2390 def test__process_flavor_params(
2398 db
.get_one
.return_value
= {
2399 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2404 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2408 "id": "without_volumes-vnf",
2409 "product-name": "without_volumes-vnf",
2412 "id": "without_volumes-VM",
2413 "name": "without_volumes-VM",
2414 "sw-image-desc": "ubuntu20.04",
2415 "alternative-sw-image-desc": [
2417 "ubuntu20.04-azure",
2419 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2423 "virtual-storage-desc": [
2424 {"id": "root-volume", "size-of-storage": "10"},
2426 "id": "ephemeral-volume",
2427 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2428 "size-of-storage": "1",
2434 "path": "/app/storage/",
2449 "numa": "there-is-numa-here",
2462 "numa": "there-is-numa-here",
2471 "memory-mb": "1024",
2479 "ns-flavor-id": "test_id",
2480 "virtual-storages": [
2482 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2483 "size-of-storage": "10",
2486 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2487 "size-of-storage": "20",
2492 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2497 target_record_id
= ""
2499 epa_params
.return_value
= {
2500 "numa": "there-is-numa-here",
2503 result
= Ns
._process
_flavor
_params
(
2504 target_flavor
=target_flavor
,
2507 target_record_id
=target_record_id
,
2511 self
.assertTrue(epa_params
.called
)
2512 self
.assertDictEqual(result
, expected_result
)
2514 def test__process_net_params_with_empty_params(
2524 "provider_network": "some-profile-here",
2526 "some_ip_profile": "here",
2529 target_record_id
= ""
2532 "net_name": "ns-name-vld-name",
2533 "net_type": "bridge",
2535 "some_ip_profile": "here",
2537 "provider_network_profile": "some-profile-here",
2541 result
= Ns
._process
_net
_params
(
2542 target_vld
=target_vld
,
2545 target_record_id
=target_record_id
,
2548 self
.assertDictEqual(expected_result
, result
)
2550 def test__process_net_params_with_vim_info_sdn(
2561 "sdn-ports": ["some", "ports", "here"],
2562 "vlds": ["some", "vlds", "here"],
2565 target_record_id
= "vld.sdn.something"
2568 "sdn-ports": ["some", "ports", "here"],
2569 "vlds": ["some", "vlds", "here"],
2574 result
= Ns
._process
_net
_params
(
2575 target_vld
=target_vld
,
2578 target_record_id
=target_record_id
,
2581 self
.assertDictEqual(expected_result
, result
)
2583 def test__process_net_params_with_vim_info_sdn_target_vim(
2594 "sdn-ports": ["some", "ports", "here"],
2595 "vlds": ["some", "vlds", "here"],
2596 "target_vim": "some-vim",
2599 target_record_id
= "vld.sdn.something"
2601 "depends_on": ["some-vim vld.sdn"],
2603 "sdn-ports": ["some", "ports", "here"],
2604 "vlds": ["some", "vlds", "here"],
2605 "target_vim": "some-vim",
2610 result
= Ns
._process
_net
_params
(
2611 target_vld
=target_vld
,
2614 target_record_id
=target_record_id
,
2617 self
.assertDictEqual(expected_result
, result
)
2619 def test__process_net_params_with_vim_network_name(
2629 "vim_network_name": "some-network-name",
2631 target_record_id
= "vld.sdn.something"
2635 "name": "some-network-name",
2640 result
= Ns
._process
_net
_params
(
2641 target_vld
=target_vld
,
2644 target_record_id
=target_record_id
,
2647 self
.assertDictEqual(expected_result
, result
)
2649 def test__process_net_params_with_vim_network_id(
2659 "vim_network_id": "some-network-id",
2661 target_record_id
= "vld.sdn.something"
2665 "id": "some-network-id",
2670 result
= Ns
._process
_net
_params
(
2671 target_vld
=target_vld
,
2674 target_record_id
=target_record_id
,
2677 self
.assertDictEqual(expected_result
, result
)
2679 def test__process_net_params_with_mgmt_network(
2685 "mgmt-network": "some-mgmt-network",
2691 target_record_id
= "vld.sdn.something"
2699 result
= Ns
._process
_net
_params
(
2700 target_vld
=target_vld
,
2703 target_record_id
=target_record_id
,
2706 self
.assertDictEqual(expected_result
, result
)
2708 def test__process_net_params_with_underlay_eline(
2713 "underlay": "some-underlay-here",
2720 "provider_network": "some-profile-here",
2722 "some_ip_profile": "here",
2725 target_record_id
= ""
2729 "some_ip_profile": "here",
2731 "net_name": "ns-name-vld-name",
2733 "provider_network_profile": "some-profile-here",
2737 result
= Ns
._process
_net
_params
(
2738 target_vld
=target_vld
,
2741 target_record_id
=target_record_id
,
2744 self
.assertDictEqual(expected_result
, result
)
2746 def test__process_net_params_with_underlay_elan(
2751 "underlay": "some-underlay-here",
2758 "provider_network": "some-profile-here",
2760 "some_ip_profile": "here",
2763 target_record_id
= ""
2767 "some_ip_profile": "here",
2769 "net_name": "ns-name-vld-name",
2771 "provider_network_profile": "some-profile-here",
2775 result
= Ns
._process
_net
_params
(
2776 target_vld
=target_vld
,
2779 target_record_id
=target_record_id
,
2782 self
.assertDictEqual(expected_result
, result
)
2784 def test__get_cloud_init_exception(self
):
2785 db_mock
= MagicMock(name
="database mock")
2790 with self
.assertRaises(NsException
):
2791 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2793 def test__get_cloud_init_file_fs_exception(self
):
2794 db_mock
= MagicMock(name
="database mock")
2797 location
= "vnfr_id_123456:file:test_file"
2798 db_mock
.get_one
.return_value
= {
2801 "folder": "/home/osm",
2802 "pkg-dir": "vnfr_test_dir",
2807 with self
.assertRaises(NsException
):
2808 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2810 def test__get_cloud_init_file(self
):
2811 db_mock
= MagicMock(name
="database mock")
2812 fs_mock
= MagicMock(name
="filesystem mock")
2813 file_mock
= MagicMock(name
="file mock")
2815 location
= "vnfr_id_123456:file:test_file"
2816 cloud_init_content
= "this is a cloud init file content"
2818 db_mock
.get_one
.return_value
= {
2821 "folder": "/home/osm",
2822 "pkg-dir": "vnfr_test_dir",
2826 fs_mock
.file_open
.return_value
= file_mock
2827 file_mock
.__enter
__.return_value
.read
.return_value
= cloud_init_content
2829 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2831 self
.assertEqual(cloud_init_content
, result
)
2833 def test__get_cloud_init_vdu(self
):
2834 db_mock
= MagicMock(name
="database mock")
2837 location
= "vnfr_id_123456:vdu:0"
2838 cloud_init_content
= "this is a cloud init file content"
2840 db_mock
.get_one
.return_value
= {
2843 "cloud-init": cloud_init_content
,
2848 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2850 self
.assertEqual(cloud_init_content
, result
)
2852 @patch("jinja2.Environment.__init__")
2853 def test__parse_jinja2_undefined_error(self
, env_mock
: Mock
):
2854 cloud_init_content
= None
2858 env_mock
.side_effect
= UndefinedError("UndefinedError occurred.")
2860 with self
.assertRaises(NsException
):
2862 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2865 @patch("jinja2.Environment.__init__")
2866 def test__parse_jinja2_template_error(self
, env_mock
: Mock
):
2867 cloud_init_content
= None
2871 env_mock
.side_effect
= TemplateError("TemplateError occurred.")
2873 with self
.assertRaises(NsException
):
2875 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2878 @patch("jinja2.Environment.__init__")
2879 def test__parse_jinja2_template_not_found(self
, env_mock
: Mock
):
2880 cloud_init_content
= None
2884 env_mock
.side_effect
= TemplateNotFound("TemplateNotFound occurred.")
2886 with self
.assertRaises(NsException
):
2888 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2891 def test_rendering_jinja2_temp_without_special_characters(self
):
2892 cloud_init_content
= """
2895 table_type: {{type}}
2897 overwrite: {{is_override}}
2900 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2904 "is_override": "False",
2905 "command": "; mkdir abc",
2907 context
= "cloud-init for VM"
2908 expected_result
= """
2916 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
2918 result
= Ns
._parse
_jinja
2(
2919 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2921 self
.assertEqual(result
, expected_result
)
2923 def test_rendering_jinja2_temp_with_special_characters(self
):
2924 cloud_init_content
= """
2927 table_type: {{type}}
2929 overwrite: {{is_override}}
2932 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2936 "is_override": "False",
2937 "command": "& rm -rf",
2939 context
= "cloud-init for VM"
2940 expected_result
= """
2948 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
2950 result
= Ns
._parse
_jinja
2(
2951 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2953 self
.assertNotEqual(result
, expected_result
)
2955 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self
):
2956 with
patch("osm_ng_ro.ns.Environment") as mock_environment
:
2957 mock_environment
.return_value
= Environment(
2958 undefined
=StrictUndefined
,
2959 autoescape
=select_autoescape(default_for_string
=False, default
=False),
2961 cloud_init_content
= """
2964 table_type: {{type}}
2966 overwrite: {{is_override}}
2969 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2973 "is_override": "False",
2974 "command": "& rm -rf /",
2976 context
= "cloud-init for VM"
2977 expected_result
= """
2985 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
2987 result
= Ns
._parse
_jinja
2(
2988 cloud_init_content
=cloud_init_content
,
2992 self
.assertEqual(result
, expected_result
)
2994 @patch("osm_ng_ro.ns.Ns._assign_vim")
2995 def test__rebuild_start_stop_task__successful(self
, assign_vim
):
2998 actions
= ["start", "stop", "rebuild"]
3001 for action
in actions
:
3003 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3006 extra_dict
["params"] = params
3007 expected_result
= deepcopy(expected_result_rebuild_start_stop
)
3010 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3011 expected_result
["params"] = params
3012 task
= self
.ns
.rebuild_start_stop_task(
3022 self
.assertDictEqual(task
, expected_result
)
3024 @patch("osm_ng_ro.ns.Ns._assign_vim")
3025 def test__rebuild_start_stop_task__empty_extra_dict__task_without_params(
3030 actions
= ["start", "stop", "rebuild"]
3033 expected_result
= deepcopy(expected_result_rebuild_start_stop
)
3036 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3038 task
= self
.ns
.rebuild_start_stop_task(
3048 self
.assertDictEqual(task
, expected_result
)
3050 @patch("osm_ng_ro.ns.Ns._assign_vim")
3051 def test__rebuild_start_stop_task__different_vdu_index__target_record_changes(
3056 actions
= ["start", "stop", "rebuild"]
3059 for action
in actions
:
3061 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3064 extra_dict
["params"] = params
3065 expected_result
= deepcopy(expected_result_rebuild_start_stop
)
3068 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.4.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3069 expected_result
["params"] = params
3070 task
= self
.ns
.rebuild_start_stop_task(
3080 self
.assertDictEqual(task
, expected_result
)
3082 @patch("osm_ng_ro.ns.Ns._assign_vim")
3083 def test__rebuild_start_stop_task__different_task_index__task_id_changes(
3088 actions
= ["start", "stop", "rebuild"]
3091 for action
in actions
:
3093 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3096 extra_dict
["params"] = params
3097 expected_result
= deepcopy(expected_result_rebuild_start_stop
)
3100 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3101 expected_result
["params"] = params
3102 expected_result
["task_id"] = "bb937f49-3870-4169-b758-9732e1ff40f3:3"
3103 task
= self
.ns
.rebuild_start_stop_task(
3113 self
.assertDictEqual(task
, expected_result
)
3115 @patch("osm_ng_ro.ns.Ns._assign_vim")
3116 def test__rebuild_start_stop_task__assign_vim_raises__task_is_not_created(
3121 actions
= ["start", "stop", "rebuild"]
3124 for action
in actions
:
3126 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3129 extra_dict
["params"] = params
3130 assign_vim
.side_effect
= TestException("Can not connect to VIM.")
3131 with self
.assertRaises(TestException
) as err
:
3132 task
= self
.ns
.rebuild_start_stop_task(
3142 self
.assertEqual(task
, None)
3143 self
.assertEqual(str(err
.exception
), "Can not connect to VIM.")
3145 @patch("osm_ng_ro.ns.Ns._assign_vim")
3146 def test_verticalscale_task__successful(self
, assign_vim
):
3150 task
= self
.ns
.verticalscale_task(
3157 extra_dict_vertical_scale
,
3159 self
.assertDictEqual(task
, expected_result_vertical_scale
)
3161 @patch("osm_ng_ro.ns.Ns._assign_vim")
3162 def test_verticalscale_task__task_index_changes__task_id_changes(self
, assign_vim
):
3166 expected_result
= deepcopy(expected_result_vertical_scale
)
3167 expected_result
["task_id"] = "bb937f49-3870-4169-b758-9732e1ff40f3:2"
3168 task
= self
.ns
.verticalscale_task(
3175 extra_dict_vertical_scale
,
3177 self
.assertDictEqual(task
, expected_result
)
3179 @patch("osm_ng_ro.ns.Ns._assign_vim")
3180 def test_verticalscale_task__empty_extra_dict__expected_result_without_params(
3187 expected_result
= deepcopy(expected_result_vertical_scale
)
3188 expected_result
.pop("params")
3189 task
= self
.ns
.verticalscale_task(
3190 vdu
, vnf
, vdu_index
, action_id
, nsr_id_2
, task_index
, extra_dict
3192 self
.assertDictEqual(task
, expected_result
)
3194 @patch("osm_ng_ro.ns.Ns._assign_vim")
3195 def test_verticalscale_task__assign_vim_raises__task_is_not_created(
3201 assign_vim
.side_effect
= TestException("Can not connect to VIM.")
3202 with self
.assertRaises(TestException
) as err
:
3203 task
= self
.ns
.verticalscale_task(
3210 extra_dict_vertical_scale
,
3212 self
.assertEqual(task
, {})
3213 self
.assertEqual(str(err
.exception
), "Can not connect to VIM.")
3215 @patch("osm_ng_ro.ns.Ns._assign_vim")
3216 def test_migrate_task__successful(self
, assign_vim
):
3220 task
= self
.ns
.migrate_task(
3221 vdu
, vnf
, vdu_index
, action_id
, nsr_id_2
, task_index
, extra_dict_migrate
3223 self
.assertDictEqual(task
, expected_result_migrate
)
3225 @patch("osm_ng_ro.ns.Ns._assign_vim")
3226 def test_migrate_task__empty_extra_dict__task_without_params(self
, assign_vim
):
3231 expected_result
= deepcopy(expected_result_migrate
)
3232 expected_result
.pop("params")
3233 task
= self
.ns
.migrate_task(
3234 vdu
, vnf
, vdu_index
, action_id
, nsr_id_2
, task_index
, extra_dict
3236 self
.assertDictEqual(task
, expected_result
)
3238 @patch("osm_ng_ro.ns.Ns._assign_vim")
3239 def test_migrate_task__different_vdu_index__target_record_with_different_vdu_index(
3245 expected_result
= deepcopy(expected_result_migrate
)
3248 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.4.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3249 task
= self
.ns
.migrate_task(
3250 vdu
, vnf
, vdu_index
, action_id
, nsr_id_2
, task_index
, extra_dict_migrate
3252 self
.assertDictEqual(task
, expected_result
)
3254 @patch("osm_ng_ro.ns.Ns._assign_vim")
3255 def test_migrate_task__assign_vim_raises__task_is_not_created(self
, assign_vim
):
3259 assign_vim
.side_effect
= TestException("Can not connect to VIM.")
3260 with self
.assertRaises(TestException
) as err
:
3261 task
= self
.ns
.migrate_task(
3262 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict_migrate
3264 self
.assertDictEqual(task
, {})
3265 self
.assertEqual(str(err
.exception
), "Can not connect to VIM.")
3268 class TestProcessVduParams(unittest
.TestCase
):
3271 self
.logger
= CopyingMock(autospec
=True)
3273 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3274 def test_find_persistent_root_volumes_empty_instantiation_vol_list(
3275 self
, mock_volume_keeping_required
3277 """Find persistent root volume, instantiation_vol_list is empty."""
3278 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3279 target_vdu
= target_vdu_wth_persistent_storage
3280 vdu_instantiation_volumes_list
= []
3282 mock_volume_keeping_required
.return_value
= True
3283 expected_root_disk
= {
3284 "id": "persistent-root-volume",
3285 "type-of-storage": "persistent-storage:persistent-storage",
3286 "size-of-storage": "10",
3287 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}],
3289 expected_persist_root_disk
= {
3290 "persistent-root-volume": {
3291 "image_id": "ubuntu20.04",
3296 expected_disk_list
= [
3298 "image_id": "ubuntu20.04",
3303 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3304 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3306 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3307 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3308 self
.assertEqual(disk_list
, expected_disk_list
)
3309 self
.assertEqual(len(disk_list
), 1)
3311 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3312 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(
3313 self
, mock_volume_keeping_required
3315 """Find persistent root volume, always selects the first vsd as root volume."""
3316 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3317 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3318 "persistent-volume2",
3319 "persistent-root-volume",
3322 target_vdu
= target_vdu_wth_persistent_storage
3323 vdu_instantiation_volumes_list
= []
3325 mock_volume_keeping_required
.return_value
= True
3326 expected_root_disk
= {
3327 "id": "persistent-volume2",
3328 "type-of-storage": "persistent-storage:persistent-storage",
3329 "size-of-storage": "10",
3331 expected_persist_root_disk
= {
3332 "persistent-volume2": {
3333 "image_id": "ubuntu20.04",
3338 expected_disk_list
= [
3340 "image_id": "ubuntu20.04",
3345 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3346 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3348 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3349 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3350 self
.assertEqual(disk_list
, expected_disk_list
)
3351 self
.assertEqual(len(disk_list
), 1)
3353 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3354 def test_find_persistent_root_volumes_empty_size_of_storage(
3355 self
, mock_volume_keeping_required
3357 """Find persistent root volume, size of storage is empty."""
3358 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3359 vnfd
["virtual-storage-desc"][0]["size-of-storage"] = ""
3360 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3361 "persistent-volume2",
3362 "persistent-root-volume",
3365 target_vdu
= target_vdu_wth_persistent_storage
3366 vdu_instantiation_volumes_list
= []
3368 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3369 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3371 self
.assertEqual(persist_root_disk
, {})
3372 mock_volume_keeping_required
.assert_not_called()
3373 self
.assertEqual(disk_list
, [])
3375 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3376 def test_find_persistent_root_volumes_keeping_is_not_required(
3377 self
, mock_volume_keeping_required
3379 """Find persistent root volume, volume keeping is not required."""
3380 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3381 vnfd
["virtual-storage-desc"][1]["vdu-storage-requirements"] = [
3382 {"key": "keep-volume", "value": "false"},
3384 target_vdu
= target_vdu_wth_persistent_storage
3385 vdu_instantiation_volumes_list
= []
3387 mock_volume_keeping_required
.return_value
= False
3388 expected_root_disk
= {
3389 "id": "persistent-root-volume",
3390 "type-of-storage": "persistent-storage:persistent-storage",
3391 "size-of-storage": "10",
3392 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}],
3394 expected_persist_root_disk
= {
3395 "persistent-root-volume": {
3396 "image_id": "ubuntu20.04",
3401 expected_disk_list
= [
3403 "image_id": "ubuntu20.04",
3408 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3409 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3411 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3412 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3413 self
.assertEqual(disk_list
, expected_disk_list
)
3414 self
.assertEqual(len(disk_list
), 1)
3416 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3417 def test_find_persistent_root_volumes_target_vdu_mismatch(
3418 self
, mock_volume_keeping_required
3420 """Find persistent root volume, target vdu name is not matching."""
3421 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3422 vnfd
["vdu"][0]["name"] = "Several_Volumes-VM"
3423 target_vdu
= target_vdu_wth_persistent_storage
3424 vdu_instantiation_volumes_list
= []
3426 result
= self
.ns
.find_persistent_root_volumes(
3427 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3429 self
.assertEqual(result
, None)
3430 mock_volume_keeping_required
.assert_not_called()
3431 self
.assertEqual(disk_list
, [])
3432 self
.assertEqual(len(disk_list
), 0)
3434 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3435 def test_find_persistent_root_volumes_with_instantiation_vol_list(
3436 self
, mock_volume_keeping_required
3438 """Find persistent root volume, existing volume needs to be used."""
3439 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3440 target_vdu
= target_vdu_wth_persistent_storage
3441 vdu_instantiation_volumes_list
= [
3443 "vim-volume-id": vim_volume_id
,
3444 "name": "persistent-root-volume",
3448 expected_persist_root_disk
= {
3449 "persistent-root-volume": {
3450 "vim_volume_id": vim_volume_id
,
3451 "image_id": "ubuntu20.04",
3454 expected_disk_list
= [
3456 "vim_volume_id": vim_volume_id
,
3457 "image_id": "ubuntu20.04",
3460 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3461 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3463 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3464 mock_volume_keeping_required
.assert_not_called()
3465 self
.assertEqual(disk_list
, expected_disk_list
)
3466 self
.assertEqual(len(disk_list
), 1)
3468 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3469 def test_find_persistent_root_volumes_invalid_instantiation_params(
3470 self
, mock_volume_keeping_required
3472 """Find persistent root volume, existing volume id keyword is invalid."""
3473 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3474 target_vdu
= target_vdu_wth_persistent_storage
3475 vdu_instantiation_volumes_list
= [
3477 "volume-id": vim_volume_id
,
3478 "name": "persistent-root-volume",
3482 with self
.assertRaises(KeyError):
3483 self
.ns
.find_persistent_root_volumes(
3484 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3486 mock_volume_keeping_required
.assert_not_called()
3487 self
.assertEqual(disk_list
, [])
3488 self
.assertEqual(len(disk_list
), 0)
3490 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3491 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3492 self
, mock_volume_keeping_required
3494 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3495 persistent_root_disk
= {
3496 "persistent-root-volume": {
3497 "image_id": "ubuntu20.04",
3502 mock_volume_keeping_required
.return_value
= False
3503 target_vdu
= target_vdu_wth_persistent_storage
3504 vdu_instantiation_volumes_list
= []
3507 "image_id": "ubuntu20.04",
3513 "id": "persistent-volume2",
3514 "size-of-storage": "10",
3515 "type-of-storage": "persistent-storage:persistent-storage",
3517 expected_disk_list
= [
3519 "image_id": "ubuntu20.04",
3528 self
.ns
.find_persistent_volumes(
3529 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3531 self
.assertEqual(disk_list
, expected_disk_list
)
3532 mock_volume_keeping_required
.assert_called_once_with(expected_disk
)
3534 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3535 def test_find_persistent_volumes_vdu_wth_inst_vol_list(
3536 self
, mock_volume_keeping_required
3538 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3539 persistent_root_disk
= {
3540 "persistent-root-volume": {
3541 "image_id": "ubuntu20.04",
3546 vdu_instantiation_volumes_list
= [
3548 "vim-volume-id": vim_volume_id
,
3549 "name": "persistent-volume2",
3552 target_vdu
= target_vdu_wth_persistent_storage
3555 "image_id": "ubuntu20.04",
3560 expected_disk_list
= [
3562 "image_id": "ubuntu20.04",
3567 "vim_volume_id": vim_volume_id
,
3570 self
.ns
.find_persistent_volumes(
3571 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3573 self
.assertEqual(disk_list
, expected_disk_list
)
3574 mock_volume_keeping_required
.assert_not_called()
3576 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3577 def test_find_persistent_volumes_vdu_wthout_persistent_storage(
3578 self
, mock_volume_keeping_required
3580 """Find persistent ordinary volume, there is not any persistent disk."""
3581 persistent_root_disk
= {}
3582 vdu_instantiation_volumes_list
= []
3583 mock_volume_keeping_required
.return_value
= False
3584 target_vdu
= target_vdu_wthout_persistent_storage
3586 self
.ns
.find_persistent_volumes(
3587 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3589 self
.assertEqual(disk_list
, disk_list
)
3590 mock_volume_keeping_required
.assert_not_called()
3592 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3593 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3594 self
, mock_volume_keeping_required
3596 """There is persistent root disk, but there is not ordinary persistent disk."""
3597 persistent_root_disk
= {
3598 "persistent-root-volume": {
3599 "image_id": "ubuntu20.04",
3604 vdu_instantiation_volumes_list
= []
3605 mock_volume_keeping_required
.return_value
= False
3606 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3607 target_vdu
["virtual-storages"] = [
3609 "id": "persistent-root-volume",
3610 "size-of-storage": "10",
3611 "type-of-storage": "persistent-storage:persistent-storage",
3612 "vdu-storage-requirements": [
3613 {"key": "keep-volume", "value": "true"},
3617 "id": "ephemeral-volume",
3618 "size-of-storage": "1",
3619 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3624 "image_id": "ubuntu20.04",
3629 self
.ns
.find_persistent_volumes(
3630 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3632 self
.assertEqual(disk_list
, disk_list
)
3633 mock_volume_keeping_required
.assert_not_called()
3635 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3636 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(
3637 self
, mock_volume_keeping_required
3639 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3640 vim-volume-id is given as instantiation parameter but disk id is not matching.
3642 mock_volume_keeping_required
.return_value
= True
3643 vdu_instantiation_volumes_list
= [
3645 "vim-volume-id": vim_volume_id
,
3646 "name": "persistent-volume3",
3649 persistent_root_disk
= {
3650 "persistent-root-volume": {
3651 "image_id": "ubuntu20.04",
3658 "image_id": "ubuntu20.04",
3663 expected_disk_list
= [
3665 "image_id": "ubuntu20.04",
3675 "id": "persistent-volume2",
3676 "size-of-storage": "10",
3677 "type-of-storage": "persistent-storage:persistent-storage",
3679 target_vdu
= target_vdu_wth_persistent_storage
3680 self
.ns
.find_persistent_volumes(
3681 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3683 self
.assertEqual(disk_list
, expected_disk_list
)
3684 mock_volume_keeping_required
.assert_called_once_with(expected_disk
)
3686 def test_is_volume_keeping_required_true(self
):
3687 """Volume keeping is required."""
3688 virtual_storage_descriptor
= {
3689 "id": "persistent-root-volume",
3690 "type-of-storage": "persistent-storage:persistent-storage",
3691 "size-of-storage": "10",
3692 "vdu-storage-requirements": [
3693 {"key": "keep-volume", "value": "true"},
3696 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3697 self
.assertEqual(result
, True)
3699 def test_is_volume_keeping_required_false(self
):
3700 """Volume keeping is not required."""
3701 virtual_storage_descriptor
= {
3702 "id": "persistent-root-volume",
3703 "type-of-storage": "persistent-storage:persistent-storage",
3704 "size-of-storage": "10",
3705 "vdu-storage-requirements": [
3706 {"key": "keep-volume", "value": "false"},
3709 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3710 self
.assertEqual(result
, False)
3712 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self
):
3713 """Volume keeping is not required, vdu-storage-requirements key does not exist."""
3714 virtual_storage_descriptor
= {
3715 "id": "persistent-root-volume",
3716 "type-of-storage": "persistent-storage:persistent-storage",
3717 "size-of-storage": "10",
3719 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3720 self
.assertEqual(result
, False)
3722 def test_is_volume_keeping_required_wrong_keyword(self
):
3723 """vdu-storage-requirements key to indicate keeping-volume is wrong."""
3724 virtual_storage_descriptor
= {
3725 "id": "persistent-root-volume",
3726 "type-of-storage": "persistent-storage:persistent-storage",
3727 "size-of-storage": "10",
3728 "vdu-storage-requirements": [
3729 {"key": "hold-volume", "value": "true"},
3732 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3733 self
.assertEqual(result
, False)
3735 def test_sort_vdu_interfaces_position_all_wth_positions(self
):
3736 """Interfaces are sorted according to position, all have positions."""
3737 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3738 target_vdu
["interfaces"] = [
3741 "ns-vld-id": "datanet",
3746 "ns-vld-id": "mgmtnet",
3750 sorted_interfaces
= [
3753 "ns-vld-id": "mgmtnet",
3758 "ns-vld-id": "datanet",
3762 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3763 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3765 def test_sort_vdu_interfaces_position_some_wth_position(self
):
3766 """Interfaces are sorted according to position, some of them have positions."""
3767 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3768 target_vdu
["interfaces"] = [
3771 "ns-vld-id": "mgmtnet",
3775 "ns-vld-id": "datanet",
3779 sorted_interfaces
= [
3782 "ns-vld-id": "datanet",
3787 "ns-vld-id": "mgmtnet",
3790 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3791 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3793 def test_sort_vdu_interfaces_position_empty_interface_list(self
):
3794 """Interface list is empty."""
3795 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3796 target_vdu
["interfaces"] = []
3797 sorted_interfaces
= []
3798 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3799 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3801 def test_partially_locate_vdu_interfaces(self
):
3802 """Some interfaces have positions."""
3803 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3804 target_vdu
["interfaces"] = [
3807 "ns-vld-id": "net1",
3809 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3812 "ns-vld-id": "mgmtnet",
3816 "ns-vld-id": "datanet",
3820 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3821 self
.assertDictEqual(
3822 target_vdu
["interfaces"][0],
3825 "ns-vld-id": "datanet",
3829 self
.assertDictEqual(
3830 target_vdu
["interfaces"][2],
3831 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3834 def test_partially_locate_vdu_interfaces_position_start_from_0(self
):
3835 """Some interfaces have positions, position start from 0."""
3836 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3837 target_vdu
["interfaces"] = [
3840 "ns-vld-id": "net1",
3842 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3845 "ns-vld-id": "mgmtnet",
3849 "ns-vld-id": "datanet",
3853 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3854 self
.assertDictEqual(
3855 target_vdu
["interfaces"][0],
3858 "ns-vld-id": "datanet",
3862 self
.assertDictEqual(
3863 target_vdu
["interfaces"][3],
3864 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3867 def test_partially_locate_vdu_interfaces_wthout_position(self
):
3868 """Interfaces do not have positions."""
3869 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3870 target_vdu
["interfaces"] = interfaces_wthout_positions
3871 expected_result
= deepcopy(target_vdu
["interfaces"])
3872 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3873 self
.assertEqual(target_vdu
["interfaces"], expected_result
)
3875 def test_partially_locate_vdu_interfaces_all_has_position(self
):
3876 """All interfaces have position."""
3877 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3878 target_vdu
["interfaces"] = interfaces_wth_all_positions
3879 expected_interfaces
= [
3882 "ns-vld-id": "net2",
3887 "ns-vld-id": "mgmtnet",
3892 "ns-vld-id": "net1",
3896 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3897 self
.assertEqual(target_vdu
["interfaces"], expected_interfaces
)
3899 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3900 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3901 def test_prepare_vdu_cloud_init(self
, mock_parse_jinja2
, mock_get_cloud_init
):
3902 """Target_vdu has cloud-init and boot-data-drive."""
3903 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3904 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3905 target_vdu
["boot-data-drive"] = "vda"
3907 mock_get_cloud_init
.return_value
= cloud_init_content
3908 mock_parse_jinja2
.return_value
= user_data
3910 "user-data": user_data
,
3911 "boot-data-drive": "vda",
3913 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3914 self
.assertDictEqual(result
, expected_result
)
3915 mock_get_cloud_init
.assert_called_once_with(
3916 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3918 mock_parse_jinja2
.assert_called_once_with(
3919 cloud_init_content
=cloud_init_content
,
3921 context
="sample-cloud-init-path",
3924 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3925 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3926 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3927 self
, mock_parse_jinja2
, mock_get_cloud_init
3929 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3930 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3931 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3932 target_vdu
["boot-data-drive"] = "vda"
3934 mock_get_cloud_init
.side_effect
= NsException(
3935 "Mismatch descriptor for cloud init."
3938 with self
.assertRaises(NsException
) as err
:
3939 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3940 self
.assertEqual(str(err
.exception
), "Mismatch descriptor for cloud init.")
3942 mock_get_cloud_init
.assert_called_once_with(
3943 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3945 mock_parse_jinja2
.assert_not_called()
3947 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3948 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3949 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3950 self
, mock_parse_jinja2
, mock_get_cloud_init
3952 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3953 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3954 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3955 target_vdu
["boot-data-drive"] = "vda"
3957 mock_get_cloud_init
.return_value
= cloud_init_content
3958 mock_parse_jinja2
.side_effect
= NsException("Error parsing cloud-init content.")
3960 with self
.assertRaises(NsException
) as err
:
3961 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3962 self
.assertEqual(str(err
.exception
), "Error parsing cloud-init content.")
3963 mock_get_cloud_init
.assert_called_once_with(
3964 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3966 mock_parse_jinja2
.assert_called_once_with(
3967 cloud_init_content
=cloud_init_content
,
3969 context
="sample-cloud-init-path",
3972 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3973 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3974 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
3975 self
, mock_parse_jinja2
, mock_get_cloud_init
3977 """Target_vdu has cloud-init but do not have boot-data-drive."""
3978 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3979 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3981 mock_get_cloud_init
.return_value
= cloud_init_content
3982 mock_parse_jinja2
.return_value
= user_data
3984 "user-data": user_data
,
3986 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3987 self
.assertDictEqual(result
, expected_result
)
3988 mock_get_cloud_init
.assert_called_once_with(
3989 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3991 mock_parse_jinja2
.assert_called_once_with(
3992 cloud_init_content
=cloud_init_content
,
3994 context
="sample-cloud-init-path",
3997 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3998 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3999 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
4000 self
, mock_parse_jinja2
, mock_get_cloud_init
4002 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
4003 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4004 target_vdu
["cloud-init"] = "sample-cloud-init-path"
4005 target_vdu
["boot-data-drive"] = "vda"
4006 vdu2cloud_init
= {"sample-cloud-init-path": cloud_init_content
}
4007 mock_parse_jinja2
.return_value
= user_data
4009 "user-data": user_data
,
4010 "boot-data-drive": "vda",
4012 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
4013 self
.assertDictEqual(result
, expected_result
)
4014 mock_get_cloud_init
.assert_not_called()
4015 mock_parse_jinja2
.assert_called_once_with(
4016 cloud_init_content
=cloud_init_content
,
4018 context
="sample-cloud-init-path",
4021 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
4022 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
4023 def test_prepare_vdu_cloud_init_no_cloud_init(
4024 self
, mock_parse_jinja2
, mock_get_cloud_init
4026 """Target_vdu do not have cloud-init."""
4027 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4028 target_vdu
["boot-data-drive"] = "vda"
4031 "boot-data-drive": "vda",
4033 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
4034 self
.assertDictEqual(result
, expected_result
)
4035 mock_get_cloud_init
.assert_not_called()
4036 mock_parse_jinja2
.assert_not_called()
4038 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self
):
4039 """ns_vld and vnf_vld both exist."""
4042 "ns-vld-id": "mgmtnet",
4043 "vnf-vld-id": "mgmt_cp_int",
4045 expected_result
= f
"{ns_preffix}:vld.mgmtnet"
4046 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4047 interface
, ns_preffix
, vnf_preffix
4049 self
.assertEqual(result
, expected_result
)
4051 def test_check_vld_information_of_interfaces_empty_interfaces(self
):
4052 """Interface dict is empty."""
4054 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4055 interface
, ns_preffix
, vnf_preffix
4057 self
.assertEqual(result
, "")
4059 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self
):
4060 """Interface dict has only vnf_vld."""
4063 "vnf-vld-id": "mgmt_cp_int",
4065 expected_result
= f
"{vnf_preffix}:vld.mgmt_cp_int"
4066 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4067 interface
, ns_preffix
, vnf_preffix
4069 self
.assertEqual(result
, expected_result
)
4071 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
4074 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
4077 "vnf-vld-id": "mgmt_cp_int",
4080 with self
.assertRaises(Exception) as err
:
4081 self
.ns
._check
_vld
_information
_of
_interfaces
(
4082 interface
, ns_preffix
, vnf_preffix
4084 self
.assertEqual(type(err
), TypeError)
4086 def test_prepare_interface_port_security_has_security_details(self
):
4087 """Interface dict has port security details."""
4090 "ns-vld-id": "mgmtnet",
4091 "vnf-vld-id": "mgmt_cp_int",
4092 "port-security-enabled": True,
4093 "port-security-disable-strategy": "allow-address-pairs",
4095 expected_interface
= {
4097 "ns-vld-id": "mgmtnet",
4098 "vnf-vld-id": "mgmt_cp_int",
4099 "port_security": True,
4100 "port_security_disable_strategy": "allow-address-pairs",
4102 self
.ns
._prepare
_interface
_port
_security
(interface
)
4103 self
.assertDictEqual(interface
, expected_interface
)
4105 def test_prepare_interface_port_security_empty_interfaces(self
):
4106 """Interface dict is empty."""
4108 expected_interface
= {}
4109 self
.ns
._prepare
_interface
_port
_security
(interface
)
4110 self
.assertDictEqual(interface
, expected_interface
)
4112 def test_prepare_interface_port_security_wthout_port_security(self
):
4113 """Interface dict does not have port security details."""
4116 "ns-vld-id": "mgmtnet",
4117 "vnf-vld-id": "mgmt_cp_int",
4119 expected_interface
= {
4121 "ns-vld-id": "mgmtnet",
4122 "vnf-vld-id": "mgmt_cp_int",
4124 self
.ns
._prepare
_interface
_port
_security
(interface
)
4125 self
.assertDictEqual(interface
, expected_interface
)
4127 def test_create_net_item_of_interface_floating_ip_port_security(self
):
4128 """Interface dict has floating ip, port-security details."""
4131 "vcpi": "sample_vcpi",
4132 "port_security": True,
4133 "port_security_disable_strategy": "allow-address-pairs",
4134 "floating_ip": "10.1.1.12",
4135 "ns-vld-id": "mgmtnet",
4136 "vnf-vld-id": "mgmt_cp_int",
4138 net_text
= f
"{ns_preffix}"
4139 expected_net_item
= {
4141 "port_security": True,
4142 "port_security_disable_strategy": "allow-address-pairs",
4143 "floating_ip": "10.1.1.12",
4144 "net_id": f
"TASK-{ns_preffix}",
4147 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4148 self
.assertDictEqual(result
, expected_net_item
)
4150 def test_create_net_item_of_interface_invalid_net_text(self
):
4151 """net-text is invalid."""
4154 "vcpi": "sample_vcpi",
4155 "port_security": True,
4156 "port_security_disable_strategy": "allow-address-pairs",
4157 "floating_ip": "10.1.1.12",
4158 "ns-vld-id": "mgmtnet",
4159 "vnf-vld-id": "mgmt_cp_int",
4162 with self
.assertRaises(TypeError):
4163 self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4165 def test_create_net_item_of_interface_empty_interface(self
):
4166 """Interface dict is empty."""
4168 net_text
= ns_preffix
4169 expected_net_item
= {
4170 "net_id": f
"TASK-{ns_preffix}",
4173 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4174 self
.assertDictEqual(result
, expected_net_item
)
4176 @patch("osm_ng_ro.ns.deep_get")
4177 def test_prepare_type_of_interface_type_sriov(self
, mock_deep_get
):
4178 """Interface type is SR-IOV."""
4181 "vcpi": "sample_vcpi",
4182 "port_security": True,
4183 "port_security_disable_strategy": "allow-address-pairs",
4184 "floating_ip": "10.1.1.12",
4185 "ns-vld-id": "mgmtnet",
4186 "vnf-vld-id": "mgmt_cp_int",
4189 mock_deep_get
.return_value
= "SR-IOV"
4190 net_text
= ns_preffix
4192 expected_net_item
= {
4197 self
.ns
._prepare
_type
_of
_interface
(
4198 interface
, tasks_by_target_record_id
, net_text
, net_item
4200 self
.assertDictEqual(net_item
, expected_net_item
)
4203 tasks_by_target_record_id
[net_text
]["extra_dict"]["params"]["net_type"],
4205 mock_deep_get
.assert_called_once_with(
4206 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4209 @patch("osm_ng_ro.ns.deep_get")
4210 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
4213 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
4216 "vcpi": "sample_vcpi",
4217 "port_security": True,
4218 "port_security_disable_strategy": "allow-address-pairs",
4219 "floating_ip": "10.1.1.12",
4220 "ns-vld-id": "mgmtnet",
4221 "vnf-vld-id": "mgmt_cp_int",
4222 "type": "PCI-PASSTHROUGH",
4224 mock_deep_get
.return_value
= {}
4225 tasks_by_target_record_id
= {}
4226 net_text
= ns_preffix
4228 expected_net_item
= {
4230 "model": "PCI-PASSTHROUGH",
4231 "type": "PCI-PASSTHROUGH",
4233 self
.ns
._prepare
_type
_of
_interface
(
4234 interface
, tasks_by_target_record_id
, net_text
, net_item
4236 self
.assertDictEqual(net_item
, expected_net_item
)
4237 mock_deep_get
.assert_called_once_with(
4238 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4241 @patch("osm_ng_ro.ns.deep_get")
4242 def test_prepare_type_of_interface_type_mgmt(self
, mock_deep_get
):
4243 """Interface type is mgmt."""
4246 "vcpi": "sample_vcpi",
4247 "port_security": True,
4248 "port_security_disable_strategy": "allow-address-pairs",
4249 "floating_ip": "10.1.1.12",
4250 "ns-vld-id": "mgmtnet",
4251 "vnf-vld-id": "mgmt_cp_int",
4254 tasks_by_target_record_id
= {}
4255 net_text
= ns_preffix
4257 expected_net_item
= {
4260 self
.ns
._prepare
_type
_of
_interface
(
4261 interface
, tasks_by_target_record_id
, net_text
, net_item
4263 self
.assertDictEqual(net_item
, expected_net_item
)
4264 mock_deep_get
.assert_not_called()
4266 @patch("osm_ng_ro.ns.deep_get")
4267 def test_prepare_type_of_interface_type_bridge(self
, mock_deep_get
):
4268 """Interface type is bridge."""
4271 "vcpi": "sample_vcpi",
4272 "port_security": True,
4273 "port_security_disable_strategy": "allow-address-pairs",
4274 "floating_ip": "10.1.1.12",
4275 "ns-vld-id": "mgmtnet",
4276 "vnf-vld-id": "mgmt_cp_int",
4278 tasks_by_target_record_id
= {}
4279 net_text
= ns_preffix
4281 expected_net_item
= {
4285 self
.ns
._prepare
_type
_of
_interface
(
4286 interface
, tasks_by_target_record_id
, net_text
, net_item
4288 self
.assertDictEqual(net_item
, expected_net_item
)
4289 mock_deep_get
.assert_not_called()
4291 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4292 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4293 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4294 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4295 def test_prepare_vdu_interfaces(
4297 mock_type_of_interface
,
4298 mock_item_of_interface
,
4300 mock_vld_information_of_interface
,
4302 """Prepare vdu interfaces successfully."""
4303 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4306 "ns-vld-id": "net1",
4307 "ip-address": "13.2.12.31",
4308 "mgmt-interface": True,
4312 "vnf-vld-id": "net2",
4313 "mac-address": "d0:94:66:ed:fc:e2",
4317 "ns-vld-id": "mgmtnet",
4319 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4321 "params": "test_params",
4322 "find_params": "test_find_params",
4326 net_text_1
= f
"{ns_preffix}:net1"
4327 net_text_2
= f
"{vnf_preffix}:net2"
4328 net_text_3
= f
"{ns_preffix}:mgmtnet"
4331 "net_id": f
"TASK-{ns_preffix}",
4336 "net_id": f
"TASK-{ns_preffix}",
4341 "net_id": f
"TASK-{ns_preffix}",
4344 mock_item_of_interface
.side_effect
= [net_item_1
, net_item_2
, net_item_3
]
4345 mock_vld_information_of_interface
.side_effect
= [
4351 expected_extra_dict
= {
4352 "params": "test_params",
4353 "find_params": "test_find_params",
4354 "depends_on": [net_text_1
, net_text_2
, net_text_3
],
4355 "mgmt_vdu_interface": 0,
4357 updated_net_item1
= deepcopy(net_item_1
)
4358 updated_net_item1
.update({"ip_address": "13.2.12.31"})
4359 updated_net_item2
= deepcopy(net_item_2
)
4360 updated_net_item2
.update({"mac_address": "d0:94:66:ed:fc:e2"})
4361 expected_net_list
= [updated_net_item1
, updated_net_item2
, net_item_3
]
4362 self
.ns
._prepare
_vdu
_interfaces
(
4368 tasks_by_target_record_id
,
4371 _call_mock_vld_information_of_interface
= (
4372 mock_vld_information_of_interface
.call_args_list
4375 _call_mock_vld_information_of_interface
[0][0],
4376 (interface_1
, ns_preffix
, vnf_preffix
),
4379 _call_mock_vld_information_of_interface
[1][0],
4380 (interface_2
, ns_preffix
, vnf_preffix
),
4383 _call_mock_vld_information_of_interface
[2][0],
4384 (interface_3
, ns_preffix
, vnf_preffix
),
4387 _call_mock_port_security
= mock_port_security
.call_args_list
4388 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4389 self
.assertEqual(_call_mock_port_security
[1].args
[0], interface_2
)
4390 self
.assertEqual(_call_mock_port_security
[2].args
[0], interface_3
)
4392 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4393 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4394 self
.assertEqual(_call_mock_item_of_interface
[1][0], (interface_2
, net_text_2
))
4395 self
.assertEqual(_call_mock_item_of_interface
[2][0], (interface_3
, net_text_3
))
4397 _call_mock_type_of_interface
= mock_type_of_interface
.call_args_list
4399 _call_mock_type_of_interface
[0][0],
4400 (interface_1
, tasks_by_target_record_id
, net_text_1
, net_item_1
),
4403 _call_mock_type_of_interface
[1][0],
4404 (interface_2
, tasks_by_target_record_id
, net_text_2
, net_item_2
),
4407 _call_mock_type_of_interface
[2][0],
4408 (interface_3
, tasks_by_target_record_id
, net_text_3
, net_item_3
),
4410 self
.assertEqual(net_list
, expected_net_list
)
4411 self
.assertEqual(extra_dict
, expected_extra_dict
)
4412 self
.logger
.error
.assert_not_called()
4414 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4415 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4416 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4417 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4418 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4420 mock_type_of_interface
,
4421 mock_item_of_interface
,
4423 mock_vld_information_of_interface
,
4425 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4426 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4429 "ns-vld-id": "net1",
4430 "ip-address": "13.2.12.31",
4431 "mgmt-interface": True,
4435 "vnf-vld-id": "net2",
4436 "mac-address": "d0:94:66:ed:fc:e2",
4440 "ns-vld-id": "mgmtnet",
4442 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4444 "params": "test_params",
4445 "find_params": "test_find_params",
4448 net_text_1
= f
"{ns_preffix}:net1"
4449 mock_item_of_interface
.side_effect
= [TypeError, TypeError, TypeError]
4451 mock_vld_information_of_interface
.side_effect
= [net_text_1
]
4453 expected_extra_dict
= {
4454 "params": "test_params",
4455 "find_params": "test_find_params",
4456 "depends_on": [net_text_1
],
4458 with self
.assertRaises(TypeError):
4459 self
.ns
._prepare
_vdu
_interfaces
(
4465 tasks_by_target_record_id
,
4469 _call_mock_vld_information_of_interface
= (
4470 mock_vld_information_of_interface
.call_args_list
4473 _call_mock_vld_information_of_interface
[0][0],
4474 (interface_1
, ns_preffix
, vnf_preffix
),
4477 _call_mock_port_security
= mock_port_security
.call_args_list
4478 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4480 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4481 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4483 mock_type_of_interface
.assert_not_called()
4484 self
.logger
.error
.assert_not_called()
4485 self
.assertEqual(net_list
, [])
4486 self
.assertEqual(extra_dict
, expected_extra_dict
)
4488 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4489 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4490 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4491 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4492 def test_prepare_vdu_interfaces_vld_information_is_empty(
4494 mock_type_of_interface
,
4495 mock_item_of_interface
,
4497 mock_vld_information_of_interface
,
4499 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4500 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4503 "ns-vld-id": "net1",
4504 "ip-address": "13.2.12.31",
4505 "mgmt-interface": True,
4509 "vnf-vld-id": "net2",
4510 "mac-address": "d0:94:66:ed:fc:e2",
4514 "ns-vld-id": "mgmtnet",
4516 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4518 "params": "test_params",
4519 "find_params": "test_find_params",
4522 mock_vld_information_of_interface
.side_effect
= ["", "", ""]
4524 self
.ns
._prepare
_vdu
_interfaces
(
4530 tasks_by_target_record_id
,
4534 _call_mock_vld_information_of_interface
= (
4535 mock_vld_information_of_interface
.call_args_list
4538 _call_mock_vld_information_of_interface
[0][0],
4539 (interface_1
, ns_preffix
, vnf_preffix
),
4542 _call_mock_vld_information_of_interface
[1][0],
4543 (interface_2
, ns_preffix
, vnf_preffix
),
4546 _call_mock_vld_information_of_interface
[2][0],
4547 (interface_3
, ns_preffix
, vnf_preffix
),
4550 _call_logger
= self
.logger
.error
.call_args_list
4553 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4557 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4561 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4563 self
.assertEqual(net_list
, [])
4567 "params": "test_params",
4568 "find_params": "test_find_params",
4573 mock_item_of_interface
.assert_not_called()
4574 mock_port_security
.assert_not_called()
4575 mock_type_of_interface
.assert_not_called()
4577 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4578 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4579 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4580 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4581 def test_prepare_vdu_interfaces_empty_interface_list(
4583 mock_type_of_interface
,
4584 mock_item_of_interface
,
4586 mock_vld_information_of_interface
,
4588 """Prepare vdu interfaces, interface list is empty."""
4589 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4590 target_vdu
["interfaces"] = []
4593 self
.ns
._prepare
_vdu
_interfaces
(
4599 tasks_by_target_record_id
,
4602 mock_type_of_interface
.assert_not_called()
4603 mock_vld_information_of_interface
.assert_not_called()
4604 mock_item_of_interface
.assert_not_called()
4605 mock_port_security
.assert_not_called()
4607 def test_prepare_vdu_ssh_keys(self
):
4608 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4609 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4610 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4611 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4612 target_vdu
["ssh-access-required"] = True
4614 expected_cloud_config
= {
4615 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4617 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4618 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4620 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self
):
4621 """Target_vdu does not have ssh-keys."""
4622 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4623 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4624 target_vdu
["ssh-access-required"] = True
4626 expected_cloud_config
= {"key-pairs": [{"public_key": "path_of_public_key"}]}
4627 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4628 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4630 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self
):
4631 """Target_vdu has ssh-keys, ssh-access is not required."""
4632 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4633 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4634 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4635 target_vdu
["ssh-access-required"] = False
4637 expected_cloud_config
= {"key-pairs": ["sample-ssh-key"]}
4638 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4639 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4641 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4642 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4643 def test_add_persistent_root_disk_to_disk_list_keep_false(
4644 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4646 """Add persistent root disk to disk_list, keep volume set to False."""
4648 "id": "persistent-root-volume",
4649 "type-of-storage": "persistent-storage:persistent-storage",
4650 "size-of-storage": "10",
4652 mock_select_persistent_root_disk
.return_value
= root_disk
4653 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4654 vnfd
["virtual-storage-desc"][1] = root_disk
4655 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4656 persistent_root_disk
= {}
4658 mock_volume_keeping_required
.return_value
= False
4659 expected_disk_list
= [
4661 "image_id": "ubuntu20.04",
4666 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4667 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4669 self
.assertEqual(disk_list
, expected_disk_list
)
4670 mock_select_persistent_root_disk
.assert_called_once()
4671 mock_volume_keeping_required
.assert_called_once()
4673 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4674 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4675 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4676 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4678 """Add persistent root disk to disk_list"""
4680 "id": "persistent-root-volume",
4681 "type-of-storage": "persistent-storage:persistent-storage",
4682 "size-of-storage": "10",
4684 mock_select_persistent_root_disk
.side_effect
= AttributeError
4685 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4686 vnfd
["virtual-storage-desc"][1] = root_disk
4687 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4688 persistent_root_disk
= {}
4690 with self
.assertRaises(AttributeError):
4691 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4692 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4694 self
.assertEqual(disk_list
, [])
4695 mock_select_persistent_root_disk
.assert_called_once()
4696 mock_volume_keeping_required
.assert_not_called()
4698 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4699 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4700 def test_add_persistent_root_disk_to_disk_list_keep_true(
4701 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4703 """Add persistent root disk, keeo volume set to True."""
4704 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4705 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4706 mock_volume_keeping_required
.return_value
= True
4708 "id": "persistent-root-volume",
4709 "type-of-storage": "persistent-storage:persistent-storage",
4710 "size-of-storage": "10",
4711 "vdu-storage-requirements": [
4712 {"key": "keep-volume", "value": "true"},
4715 mock_select_persistent_root_disk
.return_value
= root_disk
4716 persistent_root_disk
= {}
4718 expected_disk_list
= [
4720 "image_id": "ubuntu20.04",
4725 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4726 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4728 self
.assertEqual(disk_list
, expected_disk_list
)
4729 mock_volume_keeping_required
.assert_called_once_with(root_disk
)
4731 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4732 def test_add_persistent_ordinary_disk_to_disk_list(
4733 self
, mock_volume_keeping_required
4735 """Add persistent ordinary disk, keeo volume set to True."""
4736 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4737 mock_volume_keeping_required
.return_value
= False
4738 persistent_root_disk
= {
4739 "persistent-root-volume": {
4740 "image_id": "ubuntu20.04",
4746 "id": "persistent-volume2",
4747 "type-of-storage": "persistent-storage:persistent-storage",
4748 "size-of-storage": "10",
4750 persistent_ordinary_disk
= {}
4753 expected_disk_list
= [
4757 "multiattach": False,
4758 "name": "persistent-volume2",
4761 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4763 persistent_root_disk
,
4764 persistent_ordinary_disk
,
4768 self
.assertEqual(disk_list
, expected_disk_list
)
4769 mock_volume_keeping_required
.assert_called_once_with(ordinary_disk
)
4771 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4772 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(
4773 self
, mock_volume_keeping_required
4775 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4776 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4777 mock_volume_keeping_required
.return_value
= False
4778 persistent_root_disk
= {
4779 "persistent-root-volume": {
4780 "image_id": "ubuntu20.04",
4784 "persistent-volume2": {
4788 persistent_ordinary_disk
= {}
4792 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4794 persistent_root_disk
,
4795 persistent_ordinary_disk
,
4799 self
.assertEqual(disk_list
, [])
4800 mock_volume_keeping_required
.assert_not_called()
4802 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4803 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4804 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4805 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4807 """VNFD does not have persistent storage."""
4808 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4809 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4810 mock_select_persistent_root_disk
.return_value
= None
4811 persistent_root_disk
= {}
4813 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4814 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4816 self
.assertEqual(disk_list
, [])
4817 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4818 mock_volume_keeping_required
.assert_not_called()
4820 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4821 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4822 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4823 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4825 """Persistent_root_disk dict is empty."""
4826 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4827 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4828 mock_select_persistent_root_disk
.return_value
= None
4829 persistent_root_disk
= {}
4831 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4832 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4834 self
.assertEqual(disk_list
, [])
4835 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4836 mock_volume_keeping_required
.assert_not_called()
4838 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self
):
4839 """Invalid extra dict."""
4840 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4841 target_vdu
["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4843 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4844 with self
.assertRaises(NsException
) as err
:
4845 self
.ns
._prepare
_vdu
_affinity
_group
_list
(target_vdu
, extra_dict
, ns_preffix
)
4846 self
.assertEqual(str(err
.exception
), "Invalid extra_dict format.")
4848 def test_prepare_vdu_affinity_group_list_one_affinity_group(self
):
4849 """There is one affinity-group."""
4850 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4851 target_vdu
["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4852 extra_dict
= {"depends_on": []}
4853 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4854 affinity_group_txt
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4855 expected_result
= [{"affinity_group_id": "TASK-" + affinity_group_txt
}]
4856 expected_extra_dict
= {"depends_on": [affinity_group_txt
]}
4857 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4858 target_vdu
, extra_dict
, ns_preffix
4860 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4861 self
.assertEqual(result
, expected_result
)
4863 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self
):
4864 """There are two affinity-groups."""
4865 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4866 target_vdu
["affinity-or-anti-affinity-group-id"] = [
4867 "affinity-group-id1",
4868 "affinity-group-id2",
4870 extra_dict
= {"depends_on": []}
4871 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4872 affinity_group_txt1
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4873 affinity_group_txt2
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4875 {"affinity_group_id": "TASK-" + affinity_group_txt1
},
4876 {"affinity_group_id": "TASK-" + affinity_group_txt2
},
4878 expected_extra_dict
= {"depends_on": [affinity_group_txt1
, affinity_group_txt2
]}
4879 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4880 target_vdu
, extra_dict
, ns_preffix
4882 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4883 self
.assertEqual(result
, expected_result
)
4885 def test_prepare_vdu_affinity_group_list_no_affinity_group(self
):
4886 """There is not any affinity-group."""
4887 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4888 extra_dict
= {"depends_on": []}
4889 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4890 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4891 target_vdu
, extra_dict
, ns_preffix
4893 self
.assertDictEqual(extra_dict
, {"depends_on": []})
4894 self
.assertEqual(result
, [])
4896 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4897 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4898 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4899 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4900 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4901 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4902 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4903 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4904 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4905 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4906 def test_process_vdu_params_with_inst_vol_list(
4908 mock_prepare_vdu_affinity_group_list
,
4909 mock_add_persistent_ordinary_disks_to_disk_list
,
4910 mock_add_persistent_root_disk_to_disk_list
,
4911 mock_find_persistent_volumes
,
4912 mock_find_persistent_root_volumes
,
4913 mock_prepare_vdu_ssh_keys
,
4914 mock_prepare_vdu_cloud_init
,
4915 mock_prepare_vdu_interfaces
,
4916 mock_locate_vdu_interfaces
,
4917 mock_sort_vdu_interfaces
,
4919 """Instantiation volume list is empty."""
4920 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4922 target_vdu
["interfaces"] = interfaces_wth_all_positions
4924 vdu_instantiation_vol_list
= [
4926 "vim-volume-id": vim_volume_id
,
4927 "name": "persistent-volume2",
4930 target_vdu
["additionalParams"] = {
4931 "OSM": {"vdu_volumes": vdu_instantiation_vol_list
}
4933 mock_prepare_vdu_cloud_init
.return_value
= {}
4934 mock_prepare_vdu_affinity_group_list
.return_value
= []
4935 persistent_root_disk
= {
4936 "persistent-root-volume": {
4937 "image_id": "ubuntu20.04",
4941 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
4943 new_kwargs
= deepcopy(kwargs
)
4948 "tasks_by_target_record_id": {},
4952 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
4953 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4954 db
.get_one
.return_value
= vnfd
4955 result
= Ns
._process
_vdu
_params
(
4956 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4958 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4959 mock_locate_vdu_interfaces
.assert_not_called()
4960 mock_prepare_vdu_cloud_init
.assert_called_once()
4961 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
4962 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
4963 mock_prepare_vdu_interfaces
.assert_called_once_with(
4965 expected_extra_dict_copy
,
4972 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4973 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4974 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4975 mock_find_persistent_volumes
.assert_called_once_with(
4976 persistent_root_disk
, target_vdu
, vdu_instantiation_vol_list
, []
4979 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4980 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4981 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4982 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4983 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4984 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4985 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4986 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4987 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4988 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4989 def test_process_vdu_params_with_inst_flavor_id(
4991 mock_prepare_vdu_affinity_group_list
,
4992 mock_add_persistent_ordinary_disks_to_disk_list
,
4993 mock_add_persistent_root_disk_to_disk_list
,
4994 mock_find_persistent_volumes
,
4995 mock_find_persistent_root_volumes
,
4996 mock_prepare_vdu_ssh_keys
,
4997 mock_prepare_vdu_cloud_init
,
4998 mock_prepare_vdu_interfaces
,
4999 mock_locate_vdu_interfaces
,
5000 mock_sort_vdu_interfaces
,
5002 """Instantiation volume list is empty."""
5003 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5005 target_vdu
["interfaces"] = interfaces_wth_all_positions
5007 vdu_instantiation_flavor_id
= "flavor_test"
5009 target_vdu
["additionalParams"] = {
5010 "OSM": {"vim_flavor_id": vdu_instantiation_flavor_id
}
5012 mock_prepare_vdu_cloud_init
.return_value
= {}
5013 mock_prepare_vdu_affinity_group_list
.return_value
= []
5015 new_kwargs
= deepcopy(kwargs
)
5020 "tasks_by_target_record_id": {},
5024 expected_extra_dict_copy
= deepcopy(expected_extra_dict3
)
5025 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5026 db
.get_one
.return_value
= vnfd
5027 result
= Ns
._process
_vdu
_params
(
5028 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5030 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5031 mock_locate_vdu_interfaces
.assert_not_called()
5032 mock_prepare_vdu_cloud_init
.assert_called_once()
5033 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5034 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5035 mock_prepare_vdu_interfaces
.assert_called_once_with(
5037 expected_extra_dict_copy
,
5044 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5045 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5046 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5047 mock_find_persistent_volumes
.assert_not_called()
5049 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5050 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5051 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5052 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5053 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5054 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5055 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5056 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5057 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5058 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5059 def test_process_vdu_params_wth_affinity_groups(
5061 mock_prepare_vdu_affinity_group_list
,
5062 mock_add_persistent_ordinary_disks_to_disk_list
,
5063 mock_add_persistent_root_disk_to_disk_list
,
5064 mock_find_persistent_volumes
,
5065 mock_find_persistent_root_volumes
,
5066 mock_prepare_vdu_ssh_keys
,
5067 mock_prepare_vdu_cloud_init
,
5068 mock_prepare_vdu_interfaces
,
5069 mock_locate_vdu_interfaces
,
5070 mock_sort_vdu_interfaces
,
5072 """There is cloud-config."""
5073 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5076 target_vdu
["interfaces"] = interfaces_wth_all_positions
5077 mock_prepare_vdu_cloud_init
.return_value
= {}
5078 mock_prepare_vdu_affinity_group_list
.return_value
= [
5083 new_kwargs
= deepcopy(kwargs
)
5088 "tasks_by_target_record_id": {},
5092 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
5093 expected_extra_dict3
["params"]["affinity_group_list"] = [
5097 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5098 db
.get_one
.return_value
= vnfd
5099 result
= Ns
._process
_vdu
_params
(
5100 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5102 self
.assertDictEqual(result
, expected_extra_dict3
)
5103 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5104 mock_locate_vdu_interfaces
.assert_not_called()
5105 mock_prepare_vdu_cloud_init
.assert_called_once()
5106 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5107 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5108 mock_prepare_vdu_interfaces
.assert_called_once_with(
5110 expected_extra_dict3
,
5118 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5119 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5120 mock_find_persistent_volumes
.assert_not_called()
5122 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5123 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5124 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5125 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5126 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5127 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5128 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5129 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5130 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5131 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5132 def test_process_vdu_params_wth_cloud_config(
5134 mock_prepare_vdu_affinity_group_list
,
5135 mock_add_persistent_ordinary_disks_to_disk_list
,
5136 mock_add_persistent_root_disk_to_disk_list
,
5137 mock_find_persistent_volumes
,
5138 mock_find_persistent_root_volumes
,
5139 mock_prepare_vdu_ssh_keys
,
5140 mock_prepare_vdu_cloud_init
,
5141 mock_prepare_vdu_interfaces
,
5142 mock_locate_vdu_interfaces
,
5143 mock_sort_vdu_interfaces
,
5145 """There is cloud-config."""
5146 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5149 target_vdu
["interfaces"] = interfaces_wth_all_positions
5150 mock_prepare_vdu_cloud_init
.return_value
= {
5151 "user-data": user_data
,
5152 "boot-data-drive": "vda",
5154 mock_prepare_vdu_affinity_group_list
.return_value
= []
5156 new_kwargs
= deepcopy(kwargs
)
5161 "tasks_by_target_record_id": {},
5165 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
5166 expected_extra_dict3
["params"]["cloud_config"] = {
5167 "user-data": user_data
,
5168 "boot-data-drive": "vda",
5170 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5171 db
.get_one
.return_value
= vnfd
5172 result
= Ns
._process
_vdu
_params
(
5173 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5175 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5176 mock_locate_vdu_interfaces
.assert_not_called()
5177 mock_prepare_vdu_cloud_init
.assert_called_once()
5178 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5179 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5180 mock_prepare_vdu_interfaces
.assert_called_once_with(
5182 expected_extra_dict3
,
5189 self
.assertDictEqual(result
, expected_extra_dict3
)
5190 mock_prepare_vdu_ssh_keys
.assert_called_once_with(
5191 target_vdu
, None, {"user-data": user_data
, "boot-data-drive": "vda"}
5193 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5194 mock_find_persistent_volumes
.assert_not_called()
5196 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5197 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5198 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5199 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5200 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5201 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5202 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5203 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5204 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5205 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5206 def test_process_vdu_params_wthout_persistent_storage(
5208 mock_prepare_vdu_affinity_group_list
,
5209 mock_add_persistent_ordinary_disks_to_disk_list
,
5210 mock_add_persistent_root_disk_to_disk_list
,
5211 mock_find_persistent_volumes
,
5212 mock_find_persistent_root_volumes
,
5213 mock_prepare_vdu_ssh_keys
,
5214 mock_prepare_vdu_cloud_init
,
5215 mock_prepare_vdu_interfaces
,
5216 mock_locate_vdu_interfaces
,
5217 mock_sort_vdu_interfaces
,
5219 """There is not any persistent storage."""
5220 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5223 target_vdu
["interfaces"] = interfaces_wth_all_positions
5224 mock_prepare_vdu_cloud_init
.return_value
= {}
5225 mock_prepare_vdu_affinity_group_list
.return_value
= []
5227 new_kwargs
= deepcopy(kwargs
)
5232 "tasks_by_target_record_id": {},
5236 expected_extra_dict_copy
= deepcopy(expected_extra_dict2
)
5237 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
5238 db
.get_one
.return_value
= vnfd
5239 result
= Ns
._process
_vdu
_params
(
5240 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5242 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5243 mock_locate_vdu_interfaces
.assert_not_called()
5244 mock_prepare_vdu_cloud_init
.assert_called_once()
5245 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5246 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5247 mock_prepare_vdu_interfaces
.assert_called_once_with(
5249 expected_extra_dict_copy
,
5256 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5257 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5258 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5259 mock_find_persistent_volumes
.assert_not_called()
5261 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5262 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5263 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5264 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5265 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5266 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5267 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5268 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5269 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5270 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5271 def test_process_vdu_params_interfaces_partially_located(
5273 mock_prepare_vdu_affinity_group_list
,
5274 mock_add_persistent_ordinary_disks_to_disk_list
,
5275 mock_add_persistent_root_disk_to_disk_list
,
5276 mock_find_persistent_volumes
,
5277 mock_find_persistent_root_volumes
,
5278 mock_prepare_vdu_ssh_keys
,
5279 mock_prepare_vdu_cloud_init
,
5280 mock_prepare_vdu_interfaces
,
5281 mock_locate_vdu_interfaces
,
5282 mock_sort_vdu_interfaces
,
5284 """Some interfaces have position."""
5285 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5288 target_vdu
["interfaces"] = [
5291 "ns-vld-id": "net1",
5293 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
5296 "ns-vld-id": "mgmtnet",
5299 mock_prepare_vdu_cloud_init
.return_value
= {}
5300 mock_prepare_vdu_affinity_group_list
.return_value
= []
5301 persistent_root_disk
= {
5302 "persistent-root-volume": {
5303 "image_id": "ubuntu20.04",
5308 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5310 new_kwargs
= deepcopy(kwargs
)
5315 "tasks_by_target_record_id": {},
5320 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5321 db
.get_one
.return_value
= vnfd
5322 result
= Ns
._process
_vdu
_params
(
5323 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5325 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5326 mock_sort_vdu_interfaces
.assert_not_called()
5327 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5328 mock_prepare_vdu_cloud_init
.assert_called_once()
5329 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5330 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5331 mock_prepare_vdu_interfaces
.assert_called_once_with(
5333 expected_extra_dict_copy
,
5340 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5341 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5342 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5343 mock_find_persistent_volumes
.assert_not_called()
5344 mock_find_persistent_root_volumes
.assert_not_called()
5346 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5347 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5348 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5349 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5350 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5351 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5352 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5353 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5354 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5355 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5356 def test_process_vdu_params_no_interface_position(
5358 mock_prepare_vdu_affinity_group_list
,
5359 mock_add_persistent_ordinary_disks_to_disk_list
,
5360 mock_add_persistent_root_disk_to_disk_list
,
5361 mock_find_persistent_volumes
,
5362 mock_find_persistent_root_volumes
,
5363 mock_prepare_vdu_ssh_keys
,
5364 mock_prepare_vdu_cloud_init
,
5365 mock_prepare_vdu_interfaces
,
5366 mock_locate_vdu_interfaces
,
5367 mock_sort_vdu_interfaces
,
5369 """Interfaces do not have position."""
5370 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5373 target_vdu
["interfaces"] = interfaces_wthout_positions
5374 mock_prepare_vdu_cloud_init
.return_value
= {}
5375 mock_prepare_vdu_affinity_group_list
.return_value
= []
5376 persistent_root_disk
= {
5377 "persistent-root-volume": {
5378 "image_id": "ubuntu20.04",
5383 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5384 new_kwargs
= deepcopy(kwargs
)
5389 "tasks_by_target_record_id": {},
5394 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5395 db
.get_one
.return_value
= vnfd
5396 result
= Ns
._process
_vdu
_params
(
5397 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5399 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5400 mock_sort_vdu_interfaces
.assert_not_called()
5401 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5402 mock_prepare_vdu_cloud_init
.assert_called_once()
5403 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5404 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5405 mock_prepare_vdu_interfaces
.assert_called_once_with(
5407 expected_extra_dict_copy
,
5414 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5415 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5416 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5417 mock_find_persistent_volumes
.assert_not_called()
5418 mock_find_persistent_root_volumes
.assert_not_called()
5420 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5421 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5422 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5423 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5424 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5425 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5426 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5427 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5428 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5429 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5430 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5432 mock_prepare_vdu_affinity_group_list
,
5433 mock_add_persistent_ordinary_disks_to_disk_list
,
5434 mock_add_persistent_root_disk_to_disk_list
,
5435 mock_find_persistent_volumes
,
5436 mock_find_persistent_root_volumes
,
5437 mock_prepare_vdu_ssh_keys
,
5438 mock_prepare_vdu_cloud_init
,
5439 mock_prepare_vdu_interfaces
,
5440 mock_locate_vdu_interfaces
,
5441 mock_sort_vdu_interfaces
,
5443 """Prepare vdu interfaces method raises exception."""
5444 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5447 target_vdu
["interfaces"] = interfaces_wthout_positions
5448 mock_prepare_vdu_cloud_init
.return_value
= {}
5449 mock_prepare_vdu_affinity_group_list
.return_value
= []
5450 persistent_root_disk
= {
5451 "persistent-root-volume": {
5452 "image_id": "ubuntu20.04",
5457 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5458 new_kwargs
= deepcopy(kwargs
)
5463 "tasks_by_target_record_id": {},
5467 mock_prepare_vdu_interfaces
.side_effect
= TypeError
5469 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5470 db
.get_one
.return_value
= vnfd
5471 with self
.assertRaises(Exception) as err
:
5472 Ns
._process
_vdu
_params
(
5473 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5475 self
.assertEqual(type(err
), TypeError)
5476 mock_sort_vdu_interfaces
.assert_not_called()
5477 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5478 mock_prepare_vdu_cloud_init
.assert_not_called()
5479 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
5480 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5481 mock_prepare_vdu_interfaces
.assert_called_once()
5482 mock_prepare_vdu_ssh_keys
.assert_not_called()
5483 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5484 mock_find_persistent_volumes
.assert_not_called()
5485 mock_find_persistent_root_volumes
.assert_not_called()
5487 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5488 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5489 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5490 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5491 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5492 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5493 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5494 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5495 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5496 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5497 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5499 mock_prepare_vdu_affinity_group_list
,
5500 mock_add_persistent_ordinary_disks_to_disk_list
,
5501 mock_add_persistent_root_disk_to_disk_list
,
5502 mock_find_persistent_volumes
,
5503 mock_find_persistent_root_volumes
,
5504 mock_prepare_vdu_ssh_keys
,
5505 mock_prepare_vdu_cloud_init
,
5506 mock_prepare_vdu_interfaces
,
5507 mock_locate_vdu_interfaces
,
5508 mock_sort_vdu_interfaces
,
5510 """Add persistent root disk method raises exception."""
5511 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5514 target_vdu
["interfaces"] = interfaces_wthout_positions
5515 mock_prepare_vdu_cloud_init
.return_value
= {}
5516 mock_prepare_vdu_affinity_group_list
.return_value
= []
5517 mock_add_persistent_root_disk_to_disk_list
.side_effect
= KeyError
5518 new_kwargs
= deepcopy(kwargs
)
5523 "tasks_by_target_record_id": {},
5528 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5529 db
.get_one
.return_value
= vnfd
5530 with self
.assertRaises(Exception) as err
:
5531 Ns
._process
_vdu
_params
(
5532 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5534 self
.assertEqual(type(err
), KeyError)
5535 mock_sort_vdu_interfaces
.assert_not_called()
5536 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5537 mock_prepare_vdu_cloud_init
.assert_called_once()
5538 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5539 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5540 mock_prepare_vdu_interfaces
.assert_called_once_with(
5544 f
"{ns_preffix}:image.0",
5545 f
"{ns_preffix}:flavor.0",
5555 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5556 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5557 mock_find_persistent_volumes
.assert_not_called()
5558 mock_find_persistent_root_volumes
.assert_not_called()
5560 def test_select_persistent_root_disk(self
):
5561 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5562 vdu
["virtual-storage-desc"] = [
5563 "persistent-root-volume",
5564 "persistent-volume2",
5567 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5568 expected_result
= vsd
5569 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5570 self
.assertEqual(result
, expected_result
)
5572 def test_select_persistent_root_disk_first_vsd_is_different(self
):
5573 """VDU first virtual-storage-desc is different than vsd id."""
5574 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5575 vdu
["virtual-storage-desc"] = [
5576 "persistent-volume2",
5577 "persistent-root-volume",
5580 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5581 expected_result
= None
5582 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5583 self
.assertEqual(result
, expected_result
)
5585 def test_select_persistent_root_disk_vsd_is_not_persistent(self
):
5586 """vsd type is not persistent."""
5587 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5588 vdu
["virtual-storage-desc"] = [
5589 "persistent-volume2",
5590 "persistent-root-volume",
5593 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5594 vsd
["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5595 expected_result
= None
5596 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5597 self
.assertEqual(result
, expected_result
)
5599 def test_select_persistent_root_disk_vsd_does_not_have_size(self
):
5600 """vsd size is None."""
5601 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5602 vdu
["virtual-storage-desc"] = [
5603 "persistent-volume2",
5604 "persistent-root-volume",
5607 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5608 vsd
["size-of-storage"] = None
5609 expected_result
= None
5610 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5611 self
.assertEqual(result
, expected_result
)
5613 def test_select_persistent_root_disk_vdu_wthout_vsd(self
):
5614 """VDU does not have virtual-storage-desc."""
5615 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5616 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5617 expected_result
= None
5618 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5619 self
.assertEqual(result
, expected_result
)
5621 def test_select_persistent_root_disk_invalid_vsd_type(self
):
5622 """vsd is list, expected to be a dict."""
5623 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5624 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"]
5625 with self
.assertRaises(AttributeError):
5626 Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)