1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy
import deepcopy
19 from unittest
.mock
import MagicMock
, Mock
, patch
29 from osm_ng_ro
.ns
import Ns
, NsException
32 __author__
= "Eduardo Sousa"
33 __date__
= "$19-NOV-2021 00:00:00$"
36 # Variables used in Tests
37 vnfd_wth_persistent_storage
= {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
56 "virtual-storage-desc": [
57 "persistent-root-volume",
64 "virtual-storage-desc": [
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
74 "vdu-storage-requirements": [
75 {"key": "keep-volume", "value": "true"},
79 "id": "ephemeral-volume",
80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
81 "size-of-storage": "1",
87 "path": "/app/storage/",
92 vim_volume_id
= "ru937f49-3870-4169-b758-9732e1ff40f3"
93 task_by_target_record_id
= {
94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
95 "extra_dict": {"params": {"net_type": "SR-IOV"}}
98 interfaces_wthout_positions
= [
109 "ns-vld-id": "mgmtnet",
112 interfaces_wth_all_positions
= [
125 "ns-vld-id": "mgmtnet",
129 target_vdu_wth_persistent_storage
= {
130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
133 "vdu-name": "several_volumes-VM",
137 "ns-vld-id": "mgmtnet",
140 "virtual-storages": [
142 "id": "persistent-volume2",
143 "size-of-storage": "10",
144 "type-of-storage": "persistent-storage:persistent-storage",
147 "id": "persistent-root-volume",
148 "size-of-storage": "10",
149 "type-of-storage": "persistent-storage:persistent-storage",
150 "vdu-storage-requirements": [
151 {"key": "keep-volume", "value": "true"},
155 "id": "ephemeral-volume",
156 "size-of-storage": "1",
157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
161 db
= MagicMock(name
="database mock")
162 fs
= MagicMock(name
="database mock")
163 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
164 vnf_preffix
= "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
165 vnfr_id
= "wh47f48-y870-4169-b758-5732e1ff40f5"
166 nsr_id
= "th47f48-9870-4169-b758-9732e1ff40f3"
168 "name": "sample_name",
170 expected_extra_dict
= {
172 f
"{ns_preffix}:image.0",
173 f
"{ns_preffix}:flavor.0",
176 "affinity_group_list": [],
177 "availability_zone_index": None,
178 "availability_zone_list": None,
179 "cloud_config": None,
180 "description": "several_volumes-VM",
182 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
183 "image_id": f
"TASK-{ns_preffix}:image.0",
184 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
190 expected_extra_dict2
= {
192 f
"{ns_preffix}:image.0",
193 f
"{ns_preffix}:flavor.0",
196 "affinity_group_list": [],
197 "availability_zone_index": None,
198 "availability_zone_list": None,
199 "cloud_config": None,
200 "description": "without_volumes-VM",
202 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
203 "image_id": f
"TASK-{ns_preffix}:image.0",
204 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
209 tasks_by_target_record_id
= {
210 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
213 "net_type": "SR-IOV",
220 "vdu2cloud_init": {},
222 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
223 "member-vnf-index-ref": "vnf-several-volumes",
226 vnfd_wthout_persistent_storage
= {
227 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
231 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
234 "id": "without_volumes-vnf",
235 "product-name": "without_volumes-vnf",
238 "id": "without_volumes-VM",
239 "name": "without_volumes-VM",
240 "sw-image-desc": "ubuntu20.04",
241 "alternative-sw-image-desc": [
245 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
249 "virtual-storage-desc": [
250 {"id": "root-volume", "size-of-storage": "10"},
252 "id": "ephemeral-volume",
253 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
254 "size-of-storage": "1",
260 "path": "/app/storage/",
266 target_vdu_wthout_persistent_storage
= {
267 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
270 "vdu-name": "without_volumes-VM",
274 "ns-vld-id": "mgmtnet",
277 "virtual-storages": [
280 "size-of-storage": "10",
283 "id": "ephemeral-volume",
284 "size-of-storage": "1",
285 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
289 cloud_init_content
= """
294 overwrite: {{is_override}}
297 - [ sh, -xc, "echo $(date) '{{command}}'" ]
308 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
312 class CopyingMock(MagicMock
):
313 def __call__(self
, *args
, **kwargs
):
314 args
= deepcopy(args
)
315 kwargs
= deepcopy(kwargs
)
316 return super(CopyingMock
, self
).__call
__(*args
, **kwargs
)
319 class TestNs(unittest
.TestCase
):
323 def test__create_task_without_extra_dict(self
):
325 "target_id": "vim_openstack_1",
326 "action_id": "123456",
328 "task_id": "123456:1",
329 "status": "SCHEDULED",
332 "target_record": "test_target_record",
333 "target_record_id": "test_target_record_id",
336 "action_id": "123456",
341 task
= Ns
._create
_task
(
342 deployment_info
=deployment_info
,
343 target_id
="vim_openstack_1",
346 target_record
="test_target_record",
347 target_record_id
="test_target_record_id",
350 self
.assertEqual(deployment_info
.get("task_index"), 2)
351 self
.assertDictEqual(task
, expected_result
)
353 def test__create_task(self
):
355 "target_id": "vim_openstack_1",
356 "action_id": "123456",
358 "task_id": "123456:1",
359 "status": "SCHEDULED",
362 "target_record": "test_target_record",
363 "target_record_id": "test_target_record_id",
364 # values coming from extra_dict
365 "params": "test_params",
366 "find_params": "test_find_params",
367 "depends_on": "test_depends_on",
370 "action_id": "123456",
375 task
= Ns
._create
_task
(
376 deployment_info
=deployment_info
,
377 target_id
="vim_openstack_1",
380 target_record
="test_target_record",
381 target_record_id
="test_target_record_id",
383 "params": "test_params",
384 "find_params": "test_find_params",
385 "depends_on": "test_depends_on",
389 self
.assertEqual(deployment_info
.get("task_index"), 2)
390 self
.assertDictEqual(task
, expected_result
)
392 @patch("osm_ng_ro.ns.time")
393 def test__create_ro_task(self
, mock_time
: Mock
):
394 now
= 1637324838.994551
395 mock_time
.return_value
= now
397 "target_id": "vim_openstack_1",
398 "action_id": "123456",
400 "task_id": "123456:1",
401 "status": "SCHEDULED",
404 "target_record": "test_target_record",
405 "target_record_id": "test_target_record_id",
406 # values coming from extra_dict
407 "params": "test_params",
408 "find_params": "test_find_params",
409 "depends_on": "test_depends_on",
415 "target_id": "vim_openstack_1",
418 "created_items": None,
432 ro_task
= Ns
._create
_ro
_task
(
433 target_id
="vim_openstack_1",
437 self
.assertDictEqual(ro_task
, expected_result
)
439 def test__process_image_params_with_empty_target_image(self
):
445 result
= Ns
._process
_image
_params
(
446 target_image
=target_image
,
449 target_record_id
=None,
452 self
.assertDictEqual(expected_result
, result
)
454 def test__process_image_params_with_wrong_target_image(self
):
459 "no_image": "to_see_here",
462 result
= Ns
._process
_image
_params
(
463 target_image
=target_image
,
466 target_record_id
=None,
469 self
.assertDictEqual(expected_result
, result
)
471 def test__process_image_params_with_image(self
):
483 result
= Ns
._process
_image
_params
(
484 target_image
=target_image
,
487 target_record_id
=None,
490 self
.assertDictEqual(expected_result
, result
)
492 def test__process_image_params_with_vim_image_id(self
):
501 "vim_image_id": "123456",
504 result
= Ns
._process
_image
_params
(
505 target_image
=target_image
,
508 target_record_id
=None,
511 self
.assertDictEqual(expected_result
, result
)
513 def test__process_image_params_with_image_checksum(self
):
517 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
522 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
525 result
= Ns
._process
_image
_params
(
526 target_image
=target_image
,
529 target_record_id
=None,
532 self
.assertDictEqual(expected_result
, result
)
534 def test__get_resource_allocation_params_with_empty_target_image(self
):
536 quota_descriptor
= {}
538 result
= Ns
._get
_resource
_allocation
_params
(
539 quota_descriptor
=quota_descriptor
,
542 self
.assertDictEqual(expected_result
, result
)
544 def test__get_resource_allocation_params_with_wrong_target_image(self
):
547 "no_quota": "present_here",
550 result
= Ns
._get
_resource
_allocation
_params
(
551 quota_descriptor
=quota_descriptor
,
554 self
.assertDictEqual(expected_result
, result
)
556 def test__get_resource_allocation_params_with_limit(self
):
564 result
= Ns
._get
_resource
_allocation
_params
(
565 quota_descriptor
=quota_descriptor
,
568 self
.assertDictEqual(expected_result
, result
)
570 def test__get_resource_allocation_params_with_reserve(self
):
578 result
= Ns
._get
_resource
_allocation
_params
(
579 quota_descriptor
=quota_descriptor
,
582 self
.assertDictEqual(expected_result
, result
)
584 def test__get_resource_allocation_params_with_shares(self
):
592 result
= Ns
._get
_resource
_allocation
_params
(
593 quota_descriptor
=quota_descriptor
,
596 self
.assertDictEqual(expected_result
, result
)
598 def test__get_resource_allocation_params(self
):
610 result
= Ns
._get
_resource
_allocation
_params
(
611 quota_descriptor
=quota_descriptor
,
614 self
.assertDictEqual(expected_result
, result
)
616 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
617 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
625 result
= Ns
._process
_guest
_epa
_quota
_params
(
626 guest_epa_quota
=guest_epa_quota
,
627 epa_vcpu_set
=epa_vcpu_set
,
630 self
.assertDictEqual(expected_result
, result
)
631 self
.assertFalse(resource_allocation
.called
)
633 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
634 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
642 result
= Ns
._process
_guest
_epa
_quota
_params
(
643 guest_epa_quota
=guest_epa_quota
,
644 epa_vcpu_set
=epa_vcpu_set
,
647 self
.assertDictEqual(expected_result
, result
)
648 self
.assertFalse(resource_allocation
.called
)
650 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
651 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
657 "no-quota": "nothing",
661 result
= Ns
._process
_guest
_epa
_quota
_params
(
662 guest_epa_quota
=guest_epa_quota
,
663 epa_vcpu_set
=epa_vcpu_set
,
666 self
.assertDictEqual(expected_result
, result
)
667 self
.assertFalse(resource_allocation
.called
)
669 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
670 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
676 "no-quota": "nothing",
680 result
= Ns
._process
_guest
_epa
_quota
_params
(
681 guest_epa_quota
=guest_epa_quota
,
682 epa_vcpu_set
=epa_vcpu_set
,
685 self
.assertDictEqual(expected_result
, result
)
686 self
.assertFalse(resource_allocation
.called
)
688 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
689 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
703 result
= Ns
._process
_guest
_epa
_quota
_params
(
704 guest_epa_quota
=guest_epa_quota
,
705 epa_vcpu_set
=epa_vcpu_set
,
708 self
.assertDictEqual(expected_result
, result
)
709 self
.assertFalse(resource_allocation
.called
)
711 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
712 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
732 resource_allocation_param
= {
737 resource_allocation
.return_value
= {
743 result
= Ns
._process
_guest
_epa
_quota
_params
(
744 guest_epa_quota
=guest_epa_quota
,
745 epa_vcpu_set
=epa_vcpu_set
,
748 resource_allocation
.assert_called_once_with(resource_allocation_param
)
749 self
.assertDictEqual(expected_result
, result
)
751 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
752 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
772 resource_allocation_param
= {
777 resource_allocation
.return_value
= {
783 result
= Ns
._process
_guest
_epa
_quota
_params
(
784 guest_epa_quota
=guest_epa_quota
,
785 epa_vcpu_set
=epa_vcpu_set
,
788 resource_allocation
.assert_called_once_with(resource_allocation_param
)
789 self
.assertDictEqual(expected_result
, result
)
791 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
792 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
812 resource_allocation_param
= {
817 resource_allocation
.return_value
= {
823 result
= Ns
._process
_guest
_epa
_quota
_params
(
824 guest_epa_quota
=guest_epa_quota
,
825 epa_vcpu_set
=epa_vcpu_set
,
828 resource_allocation
.assert_called_once_with(resource_allocation_param
)
829 self
.assertDictEqual(expected_result
, result
)
831 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
832 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
852 resource_allocation_param
= {
857 resource_allocation
.return_value
= {
863 result
= Ns
._process
_guest
_epa
_quota
_params
(
864 guest_epa_quota
=guest_epa_quota
,
865 epa_vcpu_set
=epa_vcpu_set
,
868 resource_allocation
.assert_called_once_with(resource_allocation_param
)
869 self
.assertDictEqual(expected_result
, result
)
871 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
872 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
892 resource_allocation_param
= {
897 resource_allocation
.return_value
= {
903 result
= Ns
._process
_guest
_epa
_quota
_params
(
904 guest_epa_quota
=guest_epa_quota
,
905 epa_vcpu_set
=epa_vcpu_set
,
908 resource_allocation
.assert_called_once_with(resource_allocation_param
)
909 self
.assertDictEqual(expected_result
, result
)
911 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
912 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
932 resource_allocation_param
= {
937 resource_allocation
.return_value
= {
943 result
= Ns
._process
_guest
_epa
_quota
_params
(
944 guest_epa_quota
=guest_epa_quota
,
945 epa_vcpu_set
=epa_vcpu_set
,
948 resource_allocation
.assert_called_once_with(resource_allocation_param
)
949 self
.assertDictEqual(expected_result
, result
)
951 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
952 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
972 resource_allocation_param
= {
977 resource_allocation
.return_value
= {
983 result
= Ns
._process
_guest
_epa
_quota
_params
(
984 guest_epa_quota
=guest_epa_quota
,
985 epa_vcpu_set
=epa_vcpu_set
,
988 resource_allocation
.assert_called_once_with(resource_allocation_param
)
989 self
.assertDictEqual(expected_result
, result
)
991 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
992 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
1037 resource_allocation
.return_value
= {
1043 result
= Ns
._process
_guest
_epa
_quota
_params
(
1044 guest_epa_quota
=guest_epa_quota
,
1045 epa_vcpu_set
=epa_vcpu_set
,
1048 self
.assertTrue(resource_allocation
.called
)
1049 self
.assertDictEqual(expected_result
, result
)
1051 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1052 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1054 resource_allocation
,
1100 epa_vcpu_set
= False
1102 resource_allocation
.return_value
= {
1108 result
= Ns
._process
_guest
_epa
_quota
_params
(
1109 guest_epa_quota
=guest_epa_quota
,
1110 epa_vcpu_set
=epa_vcpu_set
,
1113 self
.assertTrue(resource_allocation
.called
)
1114 self
.assertDictEqual(expected_result
, result
)
1116 def test__process_guest_epa_numa_params_with_empty_numa_params(self
):
1117 expected_numa_result
= []
1118 expected_epa_vcpu_set_result
= False
1119 guest_epa_quota
= {}
1121 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1122 guest_epa_quota
=guest_epa_quota
,
1124 self
.assertEqual(expected_numa_result
, numa_result
)
1125 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1127 def test__process_guest_epa_numa_params_with_wrong_numa_params(self
):
1128 expected_numa_result
= []
1129 expected_epa_vcpu_set_result
= False
1130 guest_epa_quota
= {"no_nume": "here"}
1132 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1133 guest_epa_quota
=guest_epa_quota
,
1136 self
.assertEqual(expected_numa_result
, numa_result
)
1137 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1139 def test__process_guest_epa_numa_params_with_numa_node_policy(self
):
1140 expected_numa_result
= []
1141 expected_epa_vcpu_set_result
= False
1142 guest_epa_quota
= {"numa-node-policy": {}}
1144 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1145 guest_epa_quota
=guest_epa_quota
,
1148 self
.assertEqual(expected_numa_result
, numa_result
)
1149 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1151 def test__process_guest_epa_numa_params_with_no_node(self
):
1152 expected_numa_result
= []
1153 expected_epa_vcpu_set_result
= False
1155 "numa-node-policy": {
1160 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1161 guest_epa_quota
=guest_epa_quota
,
1164 self
.assertEqual(expected_numa_result
, numa_result
)
1165 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1167 def test__process_guest_epa_numa_params_with_1_node_num_cores(self
):
1168 expected_numa_result
= [{"cores": 3}]
1169 expected_epa_vcpu_set_result
= True
1171 "numa-node-policy": {
1180 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1181 guest_epa_quota
=guest_epa_quota
,
1184 self
.assertEqual(expected_numa_result
, numa_result
)
1185 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1187 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self
):
1188 expected_numa_result
= [{"paired_threads": 3}]
1189 expected_epa_vcpu_set_result
= True
1191 "numa-node-policy": {
1194 "paired-threads": {"num-paired-threads": "3"},
1200 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1201 guest_epa_quota
=guest_epa_quota
,
1204 self
.assertEqual(expected_numa_result
, numa_result
)
1205 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1207 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self
):
1208 expected_numa_result
= [
1210 "paired-threads-id": [("0", "1"), ("4", "5")],
1213 expected_epa_vcpu_set_result
= False
1215 "numa-node-policy": {
1219 "paired-thread-ids": [
1235 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1236 guest_epa_quota
=guest_epa_quota
,
1239 self
.assertEqual(expected_numa_result
, numa_result
)
1240 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1242 def test__process_guest_epa_numa_params_with_1_node_num_threads(self
):
1243 expected_numa_result
= [{"threads": 3}]
1244 expected_epa_vcpu_set_result
= True
1246 "numa-node-policy": {
1255 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1256 guest_epa_quota
=guest_epa_quota
,
1259 self
.assertEqual(expected_numa_result
, numa_result
)
1260 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1262 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self
):
1263 expected_numa_result
= [{"memory": 2}]
1264 expected_epa_vcpu_set_result
= False
1266 "numa-node-policy": {
1275 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1276 guest_epa_quota
=guest_epa_quota
,
1279 self
.assertEqual(expected_numa_result
, numa_result
)
1280 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1282 def test__process_guest_epa_numa_params_with_1_node_vcpu(self
):
1283 expected_numa_result
= [
1289 expected_epa_vcpu_set_result
= False
1291 "numa-node-policy": {
1292 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1296 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1297 guest_epa_quota
=guest_epa_quota
,
1300 self
.assertEqual(expected_numa_result
, numa_result
)
1301 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1303 def test__process_guest_epa_numa_params_with_2_node_vcpu(self
):
1304 expected_numa_result
= [
1315 expected_epa_vcpu_set_result
= False
1317 "numa-node-policy": {
1319 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1320 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1325 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1326 guest_epa_quota
=guest_epa_quota
,
1329 self
.assertEqual(expected_numa_result
, numa_result
)
1330 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1332 def test__process_guest_epa_numa_params_with_1_node(self
):
1333 expected_numa_result
= [
1338 "paired_threads": 3,
1339 "paired-threads-id": [("0", "1"), ("4", "5")],
1344 expected_epa_vcpu_set_result
= True
1346 "numa-node-policy": {
1351 "num-paired-threads": "3",
1352 "paired-thread-ids": [
1370 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1371 guest_epa_quota
=guest_epa_quota
,
1374 self
.assertEqual(expected_numa_result
, numa_result
)
1375 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1377 def test__process_guest_epa_numa_params_with_2_nodes(self
):
1378 expected_numa_result
= [
1381 "paired_threads": 3,
1382 "paired-threads-id": [("0", "1"), ("4", "5")],
1388 "paired_threads": 7,
1389 "paired-threads-id": [("2", "3"), ("5", "6")],
1394 expected_epa_vcpu_set_result
= True
1396 "numa-node-policy": {
1401 "num-paired-threads": "3",
1402 "paired-thread-ids": [
1419 "num-paired-threads": "7",
1420 "paired-thread-ids": [
1438 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1439 guest_epa_quota
=guest_epa_quota
,
1442 self
.assertEqual(expected_numa_result
, numa_result
)
1443 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1445 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self
):
1446 expected_numa_result
= {}
1447 expected_epa_vcpu_set_result
= False
1448 guest_epa_quota
= {}
1450 epa_vcpu_set
= False
1452 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1453 guest_epa_quota
=guest_epa_quota
,
1454 vcpu_count
=vcpu_count
,
1455 epa_vcpu_set
=epa_vcpu_set
,
1458 self
.assertDictEqual(expected_numa_result
, numa_result
)
1459 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1461 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self
):
1462 expected_numa_result
= {}
1463 expected_epa_vcpu_set_result
= False
1465 "no-cpu-pinning-policy": "here",
1468 epa_vcpu_set
= False
1470 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1471 guest_epa_quota
=guest_epa_quota
,
1472 vcpu_count
=vcpu_count
,
1473 epa_vcpu_set
=epa_vcpu_set
,
1476 self
.assertDictEqual(expected_numa_result
, numa_result
)
1477 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1479 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self
):
1480 expected_numa_result
= {}
1481 expected_epa_vcpu_set_result
= True
1483 "cpu-pinning-policy": "DEDICATED",
1488 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1489 guest_epa_quota
=guest_epa_quota
,
1490 vcpu_count
=vcpu_count
,
1491 epa_vcpu_set
=epa_vcpu_set
,
1494 self
.assertDictEqual(expected_numa_result
, numa_result
)
1495 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1497 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self
):
1498 expected_numa_result
= {"threads": 3}
1499 expected_epa_vcpu_set_result
= True
1501 "cpu-pinning-policy": "DEDICATED",
1502 "cpu-thread-pinning-policy": "PREFER",
1505 epa_vcpu_set
= False
1507 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1508 guest_epa_quota
=guest_epa_quota
,
1509 vcpu_count
=vcpu_count
,
1510 epa_vcpu_set
=epa_vcpu_set
,
1513 self
.assertDictEqual(expected_numa_result
, numa_result
)
1514 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1516 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self
):
1517 expected_numa_result
= {"cores": 3}
1518 expected_epa_vcpu_set_result
= True
1520 "cpu-pinning-policy": "DEDICATED",
1521 "cpu-thread-pinning-policy": "ISOLATE",
1524 epa_vcpu_set
= False
1526 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1527 guest_epa_quota
=guest_epa_quota
,
1528 vcpu_count
=vcpu_count
,
1529 epa_vcpu_set
=epa_vcpu_set
,
1532 self
.assertDictEqual(expected_numa_result
, numa_result
)
1533 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1535 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self
):
1536 expected_numa_result
= {"threads": 3}
1537 expected_epa_vcpu_set_result
= True
1539 "cpu-pinning-policy": "DEDICATED",
1540 "cpu-thread-pinning-policy": "REQUIRE",
1543 epa_vcpu_set
= False
1545 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1546 guest_epa_quota
=guest_epa_quota
,
1547 vcpu_count
=vcpu_count
,
1548 epa_vcpu_set
=epa_vcpu_set
,
1551 self
.assertDictEqual(expected_numa_result
, numa_result
)
1552 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1554 def test__process_guest_epa_cpu_pinning_params(self
):
1555 expected_numa_result
= {"threads": 3}
1556 expected_epa_vcpu_set_result
= True
1558 "cpu-pinning-policy": "DEDICATED",
1561 epa_vcpu_set
= False
1563 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1564 guest_epa_quota
=guest_epa_quota
,
1565 vcpu_count
=vcpu_count
,
1566 epa_vcpu_set
=epa_vcpu_set
,
1569 self
.assertDictEqual(expected_numa_result
, numa_result
)
1570 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1572 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1573 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1574 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1575 def test__process_guest_epa_params_with_empty_params(
1577 guest_epa_numa_params
,
1578 guest_epa_cpu_pinning_params
,
1579 guest_epa_quota_params
,
1581 expected_result
= {}
1584 result
= Ns
._process
_epa
_params
(
1585 target_flavor
=target_flavor
,
1588 self
.assertDictEqual(expected_result
, result
)
1589 self
.assertFalse(guest_epa_numa_params
.called
)
1590 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1591 self
.assertFalse(guest_epa_quota_params
.called
)
1593 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1594 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1595 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1596 def test__process_guest_epa_params_with_wrong_params(
1598 guest_epa_numa_params
,
1599 guest_epa_cpu_pinning_params
,
1600 guest_epa_quota_params
,
1602 expected_result
= {}
1604 "no-guest-epa": "here",
1607 result
= Ns
._process
_epa
_params
(
1608 target_flavor
=target_flavor
,
1611 self
.assertDictEqual(expected_result
, result
)
1612 self
.assertFalse(guest_epa_numa_params
.called
)
1613 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1614 self
.assertFalse(guest_epa_quota_params
.called
)
1616 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1617 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1618 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1619 def test__process_guest_epa_params(
1621 guest_epa_numa_params
,
1622 guest_epa_cpu_pinning_params
,
1623 guest_epa_quota_params
,
1626 "mem-policy": "STRICT",
1631 "numa-node-policy": {
1632 "mem-policy": "STRICT",
1637 guest_epa_numa_params
.return_value
= ({}, False)
1638 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1639 guest_epa_quota_params
.return_value
= {}
1641 result
= Ns
._process
_epa
_params
(
1642 target_flavor
=target_flavor
,
1645 self
.assertDictEqual(expected_result
, result
)
1646 self
.assertTrue(guest_epa_numa_params
.called
)
1647 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1648 self
.assertTrue(guest_epa_quota_params
.called
)
1650 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1651 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1652 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1653 def test__process_guest_epa_params_with_mempage_size(
1655 guest_epa_numa_params
,
1656 guest_epa_cpu_pinning_params
,
1657 guest_epa_quota_params
,
1660 "mempage-size": "1G",
1661 "mem-policy": "STRICT",
1666 "mempage-size": "1G",
1667 "numa-node-policy": {
1668 "mem-policy": "STRICT",
1673 guest_epa_numa_params
.return_value
= ({}, False)
1674 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1675 guest_epa_quota_params
.return_value
= {}
1677 result
= Ns
._process
_epa
_params
(
1678 target_flavor
=target_flavor
,
1681 self
.assertDictEqual(expected_result
, result
)
1682 self
.assertTrue(guest_epa_numa_params
.called
)
1683 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1684 self
.assertTrue(guest_epa_quota_params
.called
)
1686 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1687 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1688 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1689 def test__process_guest_epa_params_with_numa(
1691 guest_epa_numa_params
,
1692 guest_epa_cpu_pinning_params
,
1693 guest_epa_quota_params
,
1696 "mempage-size": "1G",
1697 "cpu-pinning-policy": "DEDICATED",
1698 "cpu-thread-pinning-policy": "PREFER",
1703 "paired-threads": 3,
1704 "paired-threads-id": [("0", "1"), ("4", "5")],
1708 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1709 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1710 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1711 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1716 "mempage-size": "1G",
1717 "cpu-pinning-policy": "DEDICATED",
1718 "cpu-thread-pinning-policy": "PREFER",
1719 "numa-node-policy": {
1724 "num-paired-threads": "3",
1725 "paired-thread-ids": [
1764 guest_epa_numa_params
.return_value
= (
1768 "paired-threads": 3,
1769 "paired-threads-id": [("0", "1"), ("4", "5")],
1776 guest_epa_cpu_pinning_params
.return_value
= (
1782 guest_epa_quota_params
.return_value
= {
1805 result
= Ns
._process
_epa
_params
(
1806 target_flavor
=target_flavor
,
1808 self
.assertEqual(expected_result
, result
)
1809 self
.assertTrue(guest_epa_numa_params
.called
)
1810 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1811 self
.assertTrue(guest_epa_quota_params
.called
)
1813 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1814 def test__process_flavor_params_with_empty_target_flavor(
1822 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1827 target_record_id
= ""
1829 with self
.assertRaises(KeyError):
1830 Ns
._process
_flavor
_params
(
1831 target_flavor
=target_flavor
,
1834 target_record_id
=target_record_id
,
1837 self
.assertFalse(epa_params
.called
)
1839 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1840 def test__process_flavor_params_with_wrong_target_flavor(
1845 "no-target-flavor": "here",
1849 target_record_id
= ""
1851 with self
.assertRaises(KeyError):
1852 Ns
._process
_flavor
_params
(
1853 target_flavor
=target_flavor
,
1856 target_record_id
=target_record_id
,
1859 self
.assertFalse(epa_params
.called
)
1861 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1862 def test__process_flavor_params_with_empty_indata(
1886 "memory-mb": "1024",
1891 target_record_id
= ""
1893 epa_params
.return_value
= {}
1895 result
= Ns
._process
_flavor
_params
(
1896 target_flavor
=target_flavor
,
1899 target_record_id
=target_record_id
,
1902 self
.assertTrue(epa_params
.called
)
1903 self
.assertDictEqual(result
, expected_result
)
1905 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1906 def test__process_flavor_params_with_wrong_indata(
1930 "memory-mb": "1024",
1937 target_record_id
= ""
1939 epa_params
.return_value
= {}
1941 result
= Ns
._process
_flavor
_params
(
1942 target_flavor
=target_flavor
,
1945 target_record_id
=target_record_id
,
1948 self
.assertTrue(epa_params
.called
)
1949 self
.assertDictEqual(result
, expected_result
)
1951 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1952 def test__process_flavor_params_with_ephemeral_disk(
1960 db
.get_one
.return_value
= {
1961 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1966 {"id": "without_volumes-VM", "min-number-of-instances": 1}
1970 "id": "without_volumes-vnf",
1971 "product-name": "without_volumes-vnf",
1974 "id": "without_volumes-VM",
1975 "name": "without_volumes-VM",
1976 "sw-image-desc": "ubuntu20.04",
1977 "alternative-sw-image-desc": [
1979 "ubuntu20.04-azure",
1981 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
1985 "virtual-storage-desc": [
1986 {"id": "root-volume", "size-of-storage": "10"},
1988 "id": "ephemeral-volume",
1989 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
1990 "size-of-storage": "1",
1996 "path": "/app/storage/",
2024 "memory-mb": "1024",
2032 "ns-flavor-id": "test_id",
2033 "virtual-storages": [
2035 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2036 "size-of-storage": "10",
2041 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2046 target_record_id
= ""
2048 epa_params
.return_value
= {}
2050 result
= Ns
._process
_flavor
_params
(
2051 target_flavor
=target_flavor
,
2054 target_record_id
=target_record_id
,
2058 self
.assertTrue(epa_params
.called
)
2059 self
.assertDictEqual(result
, expected_result
)
2061 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2062 def test__process_flavor_params_with_swap_disk(
2089 "memory-mb": "1024",
2097 "ns-flavor-id": "test_id",
2098 "virtual-storages": [
2100 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2101 "size-of-storage": "20",
2106 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2111 target_record_id
= ""
2113 epa_params
.return_value
= {}
2115 result
= Ns
._process
_flavor
_params
(
2116 target_flavor
=target_flavor
,
2119 target_record_id
=target_record_id
,
2122 self
.assertTrue(epa_params
.called
)
2123 self
.assertDictEqual(result
, expected_result
)
2125 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2126 def test__process_flavor_params_with_persistent_root_disk(
2134 db
.get_one
.return_value
= {
2135 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2140 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2144 "id": "several_volumes-vnf",
2145 "product-name": "several_volumes-vnf",
2148 "id": "several_volumes-VM",
2149 "name": "several_volumes-VM",
2150 "sw-image-desc": "ubuntu20.04",
2151 "alternative-sw-image-desc": [
2153 "ubuntu20.04-azure",
2155 "virtual-storage-desc": [
2156 "persistent-root-volume",
2161 "virtual-storage-desc": [
2163 "id": "persistent-root-volume",
2164 "type-of-storage": "persistent-storage:persistent-storage",
2165 "size-of-storage": "10",
2171 "path": "/app/storage/",
2197 "memory-mb": "1024",
2205 "vdu-name": "several_volumes-VM",
2206 "ns-flavor-id": "test_id",
2207 "virtual-storages": [
2209 "type-of-storage": "persistent-storage:persistent-storage",
2210 "size-of-storage": "10",
2215 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2220 target_record_id
= ""
2222 epa_params
.return_value
= {}
2224 result
= Ns
._process
_flavor
_params
(
2225 target_flavor
=target_flavor
,
2228 target_record_id
=target_record_id
,
2232 self
.assertTrue(epa_params
.called
)
2233 self
.assertDictEqual(result
, expected_result
)
2235 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2236 def test__process_flavor_params_with_epa_params(
2247 "numa": "there-is-numa-here",
2258 "numa": "there-is-numa-here",
2267 "memory-mb": "1024",
2275 "ns-flavor-id": "test_id",
2278 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2283 target_record_id
= ""
2285 epa_params
.return_value
= {
2286 "numa": "there-is-numa-here",
2289 result
= Ns
._process
_flavor
_params
(
2290 target_flavor
=target_flavor
,
2293 target_record_id
=target_record_id
,
2296 self
.assertTrue(epa_params
.called
)
2297 self
.assertDictEqual(result
, expected_result
)
2299 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2300 def test__process_flavor_params(
2308 db
.get_one
.return_value
= {
2309 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2314 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2318 "id": "without_volumes-vnf",
2319 "product-name": "without_volumes-vnf",
2322 "id": "without_volumes-VM",
2323 "name": "without_volumes-VM",
2324 "sw-image-desc": "ubuntu20.04",
2325 "alternative-sw-image-desc": [
2327 "ubuntu20.04-azure",
2329 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2333 "virtual-storage-desc": [
2334 {"id": "root-volume", "size-of-storage": "10"},
2336 "id": "ephemeral-volume",
2337 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2338 "size-of-storage": "1",
2344 "path": "/app/storage/",
2359 "numa": "there-is-numa-here",
2372 "numa": "there-is-numa-here",
2381 "memory-mb": "1024",
2389 "ns-flavor-id": "test_id",
2390 "virtual-storages": [
2392 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2393 "size-of-storage": "10",
2396 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2397 "size-of-storage": "20",
2402 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2407 target_record_id
= ""
2409 epa_params
.return_value
= {
2410 "numa": "there-is-numa-here",
2413 result
= Ns
._process
_flavor
_params
(
2414 target_flavor
=target_flavor
,
2417 target_record_id
=target_record_id
,
2421 self
.assertTrue(epa_params
.called
)
2422 self
.assertDictEqual(result
, expected_result
)
2424 def test__ip_profile_to_ro_with_none(self
):
2427 result
= Ns
._ip
_profile
_to
_ro
(
2428 ip_profile
=ip_profile
,
2431 self
.assertIsNone(result
)
2433 def test__ip_profile_to_ro_with_empty_profile(self
):
2436 result
= Ns
._ip
_profile
_to
_ro
(
2437 ip_profile
=ip_profile
,
2440 self
.assertIsNone(result
)
2442 def test__ip_profile_to_ro_with_wrong_profile(self
):
2444 "no-profile": "here",
2447 "ip_version": "IPv4",
2448 "subnet_address": None,
2449 "gateway_address": None,
2450 "dhcp_enabled": False,
2451 "dhcp_start_address": None,
2455 result
= Ns
._ip
_profile
_to
_ro
(
2456 ip_profile
=ip_profile
,
2459 self
.assertDictEqual(expected_result
, result
)
2461 def test__ip_profile_to_ro_with_ipv4_profile(self
):
2463 "ip-version": "ipv4",
2464 "subnet-address": "192.168.0.0/24",
2465 "gateway-address": "192.168.0.254",
2468 "start-address": "192.168.0.10",
2473 "ip_version": "IPv4",
2474 "subnet_address": "192.168.0.0/24",
2475 "gateway_address": "192.168.0.254",
2476 "dhcp_enabled": True,
2477 "dhcp_start_address": "192.168.0.10",
2481 result
= Ns
._ip
_profile
_to
_ro
(
2482 ip_profile
=ip_profile
,
2485 self
.assertDictEqual(expected_result
, result
)
2487 def test__ip_profile_to_ro_with_ipv6_profile(self
):
2489 "ip-version": "ipv6",
2490 "subnet-address": "2001:0200:0001::/48",
2491 "gateway-address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2494 "start-address": "2001:0200:0001::0010",
2499 "ip_version": "IPv6",
2500 "subnet_address": "2001:0200:0001::/48",
2501 "gateway_address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2502 "dhcp_enabled": True,
2503 "dhcp_start_address": "2001:0200:0001::0010",
2507 result
= Ns
._ip
_profile
_to
_ro
(
2508 ip_profile
=ip_profile
,
2511 self
.assertDictEqual(expected_result
, result
)
2513 def test__ip_profile_to_ro_with_dns_server(self
):
2515 "ip-version": "ipv4",
2516 "subnet-address": "192.168.0.0/24",
2517 "gateway-address": "192.168.0.254",
2520 "start-address": "192.168.0.10",
2525 "address": "8.8.8.8",
2528 "address": "1.1.1.1",
2531 "address": "1.0.0.1",
2536 "ip_version": "IPv4",
2537 "subnet_address": "192.168.0.0/24",
2538 "gateway_address": "192.168.0.254",
2539 "dhcp_enabled": True,
2540 "dhcp_start_address": "192.168.0.10",
2542 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2545 result
= Ns
._ip
_profile
_to
_ro
(
2546 ip_profile
=ip_profile
,
2549 self
.assertDictEqual(expected_result
, result
)
2551 def test__ip_profile_to_ro_with_security_group(self
):
2553 "ip-version": "ipv4",
2554 "subnet-address": "192.168.0.0/24",
2555 "gateway-address": "192.168.0.254",
2558 "start-address": "192.168.0.10",
2562 "some-security-group": "here",
2566 "ip_version": "IPv4",
2567 "subnet_address": "192.168.0.0/24",
2568 "gateway_address": "192.168.0.254",
2569 "dhcp_enabled": True,
2570 "dhcp_start_address": "192.168.0.10",
2573 "some-security-group": "here",
2577 result
= Ns
._ip
_profile
_to
_ro
(
2578 ip_profile
=ip_profile
,
2581 self
.assertDictEqual(expected_result
, result
)
2583 def test__ip_profile_to_ro(self
):
2585 "ip-version": "ipv4",
2586 "subnet-address": "192.168.0.0/24",
2587 "gateway-address": "192.168.0.254",
2590 "start-address": "192.168.0.10",
2595 "address": "8.8.8.8",
2598 "address": "1.1.1.1",
2601 "address": "1.0.0.1",
2605 "some-security-group": "here",
2609 "ip_version": "IPv4",
2610 "subnet_address": "192.168.0.0/24",
2611 "gateway_address": "192.168.0.254",
2612 "dhcp_enabled": True,
2613 "dhcp_start_address": "192.168.0.10",
2615 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2617 "some-security-group": "here",
2621 result
= Ns
._ip
_profile
_to
_ro
(
2622 ip_profile
=ip_profile
,
2625 self
.assertDictEqual(expected_result
, result
)
2627 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2628 def test__process_net_params_with_empty_params(
2639 "provider_network": "some-profile-here",
2641 target_record_id
= ""
2644 "net_name": "ns-name-vld-name",
2645 "net_type": "bridge",
2647 "some_ip_profile": "here",
2649 "provider_network_profile": "some-profile-here",
2653 ip_profile_to_ro
.return_value
= {
2654 "some_ip_profile": "here",
2657 result
= Ns
._process
_net
_params
(
2658 target_vld
=target_vld
,
2661 target_record_id
=target_record_id
,
2664 self
.assertDictEqual(expected_result
, result
)
2665 self
.assertTrue(ip_profile_to_ro
.called
)
2667 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2668 def test__process_net_params_with_vim_info_sdn(
2680 "sdn-ports": ["some", "ports", "here"],
2681 "vlds": ["some", "vlds", "here"],
2684 target_record_id
= "vld.sdn.something"
2687 "sdn-ports": ["some", "ports", "here"],
2688 "vlds": ["some", "vlds", "here"],
2693 result
= Ns
._process
_net
_params
(
2694 target_vld
=target_vld
,
2697 target_record_id
=target_record_id
,
2700 self
.assertDictEqual(expected_result
, result
)
2701 self
.assertFalse(ip_profile_to_ro
.called
)
2703 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2704 def test__process_net_params_with_vim_info_sdn_target_vim(
2716 "sdn-ports": ["some", "ports", "here"],
2717 "vlds": ["some", "vlds", "here"],
2718 "target_vim": "some-vim",
2721 target_record_id
= "vld.sdn.something"
2723 "depends_on": ["some-vim vld.sdn"],
2725 "sdn-ports": ["some", "ports", "here"],
2726 "vlds": ["some", "vlds", "here"],
2727 "target_vim": "some-vim",
2732 result
= Ns
._process
_net
_params
(
2733 target_vld
=target_vld
,
2736 target_record_id
=target_record_id
,
2739 self
.assertDictEqual(expected_result
, result
)
2740 self
.assertFalse(ip_profile_to_ro
.called
)
2742 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2743 def test__process_net_params_with_vim_network_name(
2754 "vim_network_name": "some-network-name",
2756 target_record_id
= "vld.sdn.something"
2760 "name": "some-network-name",
2765 result
= Ns
._process
_net
_params
(
2766 target_vld
=target_vld
,
2769 target_record_id
=target_record_id
,
2772 self
.assertDictEqual(expected_result
, result
)
2773 self
.assertFalse(ip_profile_to_ro
.called
)
2775 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2776 def test__process_net_params_with_vim_network_id(
2787 "vim_network_id": "some-network-id",
2789 target_record_id
= "vld.sdn.something"
2793 "id": "some-network-id",
2798 result
= Ns
._process
_net
_params
(
2799 target_vld
=target_vld
,
2802 target_record_id
=target_record_id
,
2805 self
.assertDictEqual(expected_result
, result
)
2806 self
.assertFalse(ip_profile_to_ro
.called
)
2808 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2809 def test__process_net_params_with_mgmt_network(
2816 "mgmt-network": "some-mgmt-network",
2822 target_record_id
= "vld.sdn.something"
2830 result
= Ns
._process
_net
_params
(
2831 target_vld
=target_vld
,
2834 target_record_id
=target_record_id
,
2837 self
.assertDictEqual(expected_result
, result
)
2838 self
.assertFalse(ip_profile_to_ro
.called
)
2840 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2841 def test__process_net_params_with_underlay_eline(
2847 "underlay": "some-underlay-here",
2854 "provider_network": "some-profile-here",
2856 target_record_id
= ""
2860 "some_ip_profile": "here",
2862 "net_name": "ns-name-vld-name",
2864 "provider_network_profile": "some-profile-here",
2868 ip_profile_to_ro
.return_value
= {
2869 "some_ip_profile": "here",
2872 result
= Ns
._process
_net
_params
(
2873 target_vld
=target_vld
,
2876 target_record_id
=target_record_id
,
2879 self
.assertDictEqual(expected_result
, result
)
2880 self
.assertTrue(ip_profile_to_ro
.called
)
2882 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2883 def test__process_net_params_with_underlay_elan(
2889 "underlay": "some-underlay-here",
2896 "provider_network": "some-profile-here",
2898 target_record_id
= ""
2902 "some_ip_profile": "here",
2904 "net_name": "ns-name-vld-name",
2906 "provider_network_profile": "some-profile-here",
2910 ip_profile_to_ro
.return_value
= {
2911 "some_ip_profile": "here",
2914 result
= Ns
._process
_net
_params
(
2915 target_vld
=target_vld
,
2918 target_record_id
=target_record_id
,
2921 self
.assertDictEqual(expected_result
, result
)
2922 self
.assertTrue(ip_profile_to_ro
.called
)
2924 def test__get_cloud_init_exception(self
):
2925 db_mock
= MagicMock(name
="database mock")
2930 with self
.assertRaises(NsException
):
2931 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2933 def test__get_cloud_init_file_fs_exception(self
):
2934 db_mock
= MagicMock(name
="database mock")
2937 location
= "vnfr_id_123456:file:test_file"
2938 db_mock
.get_one
.return_value
= {
2941 "folder": "/home/osm",
2942 "pkg-dir": "vnfr_test_dir",
2947 with self
.assertRaises(NsException
):
2948 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2950 def test__get_cloud_init_file(self
):
2951 db_mock
= MagicMock(name
="database mock")
2952 fs_mock
= MagicMock(name
="filesystem mock")
2953 file_mock
= MagicMock(name
="file mock")
2955 location
= "vnfr_id_123456:file:test_file"
2956 cloud_init_content
= "this is a cloud init file content"
2958 db_mock
.get_one
.return_value
= {
2961 "folder": "/home/osm",
2962 "pkg-dir": "vnfr_test_dir",
2966 fs_mock
.file_open
.return_value
= file_mock
2967 file_mock
.__enter
__.return_value
.read
.return_value
= cloud_init_content
2969 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2971 self
.assertEqual(cloud_init_content
, result
)
2973 def test__get_cloud_init_vdu(self
):
2974 db_mock
= MagicMock(name
="database mock")
2977 location
= "vnfr_id_123456:vdu:0"
2978 cloud_init_content
= "this is a cloud init file content"
2980 db_mock
.get_one
.return_value
= {
2983 "cloud-init": cloud_init_content
,
2988 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2990 self
.assertEqual(cloud_init_content
, result
)
2992 @patch("jinja2.Environment.__init__")
2993 def test__parse_jinja2_undefined_error(self
, env_mock
: Mock
):
2994 cloud_init_content
= None
2998 env_mock
.side_effect
= UndefinedError("UndefinedError occurred.")
3000 with self
.assertRaises(NsException
):
3002 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3005 @patch("jinja2.Environment.__init__")
3006 def test__parse_jinja2_template_error(self
, env_mock
: Mock
):
3007 cloud_init_content
= None
3011 env_mock
.side_effect
= TemplateError("TemplateError occurred.")
3013 with self
.assertRaises(NsException
):
3015 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3018 @patch("jinja2.Environment.__init__")
3019 def test__parse_jinja2_template_not_found(self
, env_mock
: Mock
):
3020 cloud_init_content
= None
3024 env_mock
.side_effect
= TemplateNotFound("TemplateNotFound occurred.")
3026 with self
.assertRaises(NsException
):
3028 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3031 def test_rendering_jinja2_temp_without_special_characters(self
):
3032 cloud_init_content
= """
3035 table_type: {{type}}
3037 overwrite: {{is_override}}
3040 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3044 "is_override": "False",
3045 "command": "; mkdir abc",
3047 context
= "cloud-init for VM"
3048 expected_result
= """
3056 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
3058 result
= Ns
._parse
_jinja
2(
3059 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3061 self
.assertEqual(result
, expected_result
)
3063 def test_rendering_jinja2_temp_with_special_characters(self
):
3064 cloud_init_content
= """
3067 table_type: {{type}}
3069 overwrite: {{is_override}}
3072 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3076 "is_override": "False",
3077 "command": "& rm -rf",
3079 context
= "cloud-init for VM"
3080 expected_result
= """
3088 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3090 result
= Ns
._parse
_jinja
2(
3091 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3093 self
.assertNotEqual(result
, expected_result
)
3095 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self
):
3096 with
patch("osm_ng_ro.ns.Environment") as mock_environment
:
3097 mock_environment
.return_value
= Environment(
3098 undefined
=StrictUndefined
,
3099 autoescape
=select_autoescape(default_for_string
=False, default
=False),
3101 cloud_init_content
= """
3104 table_type: {{type}}
3106 overwrite: {{is_override}}
3109 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3113 "is_override": "False",
3114 "command": "& rm -rf /",
3116 context
= "cloud-init for VM"
3117 expected_result
= """
3125 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3127 result
= Ns
._parse
_jinja
2(
3128 cloud_init_content
=cloud_init_content
,
3132 self
.assertEqual(result
, expected_result
)
3134 @patch("osm_ng_ro.ns.Ns._assign_vim")
3135 def test__rebuild_start_stop_task(self
, assign_vim
):
3138 actions
= ["start", "stop", "rebuild"]
3139 vdu_id
= "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3140 vnf_id
= "665b4165-ce24-4320-bf19-b9a45bade49f"
3142 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3143 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3145 target_vim
= "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3146 t
= "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3147 for action
in actions
:
3149 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3150 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3151 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3152 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
3153 "status": "SCHEDULED",
3156 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0",
3157 "target_record_id": t
,
3159 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3163 extra_dict
["params"] = {
3164 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3167 task
= self
.ns
.rebuild_start_stop_task(
3177 self
.assertEqual(task
.get("action_id"), action_id
)
3178 self
.assertEqual(task
.get("nsr_id"), nsr_id
)
3179 self
.assertEqual(task
.get("target_id"), target_vim
)
3180 self
.assertDictEqual(task
, expected_result
)
3182 @patch("osm_ng_ro.ns.Ns._assign_vim")
3183 def test_verticalscale_task(self
, assign_vim
):
3187 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3188 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3190 target_record_id
= (
3191 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3192 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3196 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3197 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3198 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3199 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3200 "status": "SCHEDULED",
3202 "item": "verticalscale",
3203 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3204 "target_record_id": target_record_id
,
3206 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3207 "flavor_dict": "flavor_dict",
3211 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3213 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3216 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3217 extra_dict
["params"] = {
3218 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3219 "flavor_dict": "flavor_dict",
3221 task
= self
.ns
.verticalscale_task(
3222 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3225 self
.assertDictEqual(task
, expected_result
)
3227 @patch("osm_ng_ro.ns.Ns._assign_vim")
3228 def test_migrate_task(self
, assign_vim
):
3232 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3233 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3235 target_record_id
= (
3236 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3237 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3241 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3242 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3243 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3244 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3245 "status": "SCHEDULED",
3248 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3249 "target_record_id": target_record_id
,
3251 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3252 "migrate_host": "migrateToHost",
3256 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3258 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3261 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3262 extra_dict
["params"] = {
3263 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3264 "migrate_host": "migrateToHost",
3266 task
= self
.ns
.migrate_task(
3267 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3270 self
.assertDictEqual(task
, expected_result
)
3273 class TestProcessVduParams(unittest
.TestCase
):
3276 self
.logger
= CopyingMock(autospec
=True)
3278 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3279 def test_find_persistent_root_volumes_empty_instantiation_vol_list(
3280 self
, mock_volume_keeping_required
3282 """Find persistent root volume, instantiation_vol_list is empty."""
3283 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3284 target_vdu
= target_vdu_wth_persistent_storage
3285 vdu_instantiation_volumes_list
= []
3287 mock_volume_keeping_required
.return_value
= True
3288 expected_root_disk
= {
3289 "id": "persistent-root-volume",
3290 "type-of-storage": "persistent-storage:persistent-storage",
3291 "size-of-storage": "10",
3292 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}],
3294 expected_persist_root_disk
= {
3295 "persistent-root-volume": {
3296 "image_id": "ubuntu20.04",
3301 expected_disk_list
= [
3303 "image_id": "ubuntu20.04",
3308 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3309 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3311 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3312 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3313 self
.assertEqual(disk_list
, expected_disk_list
)
3314 self
.assertEqual(len(disk_list
), 1)
3316 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3317 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(
3318 self
, mock_volume_keeping_required
3320 """Find persistent root volume, always selects the first vsd as root volume."""
3321 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3322 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3323 "persistent-volume2",
3324 "persistent-root-volume",
3327 target_vdu
= target_vdu_wth_persistent_storage
3328 vdu_instantiation_volumes_list
= []
3330 mock_volume_keeping_required
.return_value
= True
3331 expected_root_disk
= {
3332 "id": "persistent-volume2",
3333 "type-of-storage": "persistent-storage:persistent-storage",
3334 "size-of-storage": "10",
3336 expected_persist_root_disk
= {
3337 "persistent-volume2": {
3338 "image_id": "ubuntu20.04",
3343 expected_disk_list
= [
3345 "image_id": "ubuntu20.04",
3350 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3351 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3353 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3354 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3355 self
.assertEqual(disk_list
, expected_disk_list
)
3356 self
.assertEqual(len(disk_list
), 1)
3358 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3359 def test_find_persistent_root_volumes_empty_size_of_storage(
3360 self
, mock_volume_keeping_required
3362 """Find persistent root volume, size of storage is empty."""
3363 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3364 vnfd
["virtual-storage-desc"][0]["size-of-storage"] = ""
3365 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3366 "persistent-volume2",
3367 "persistent-root-volume",
3370 target_vdu
= target_vdu_wth_persistent_storage
3371 vdu_instantiation_volumes_list
= []
3373 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3374 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3376 self
.assertEqual(persist_root_disk
, None)
3377 mock_volume_keeping_required
.assert_not_called()
3378 self
.assertEqual(disk_list
, [])
3380 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3381 def test_find_persistent_root_volumes_keeping_is_not_required(
3382 self
, mock_volume_keeping_required
3384 """Find persistent root volume, volume keeping is not required."""
3385 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3386 vnfd
["virtual-storage-desc"][1]["vdu-storage-requirements"] = [
3387 {"key": "keep-volume", "value": "false"},
3389 target_vdu
= target_vdu_wth_persistent_storage
3390 vdu_instantiation_volumes_list
= []
3392 mock_volume_keeping_required
.return_value
= False
3393 expected_root_disk
= {
3394 "id": "persistent-root-volume",
3395 "type-of-storage": "persistent-storage:persistent-storage",
3396 "size-of-storage": "10",
3397 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}],
3399 expected_persist_root_disk
= {
3400 "persistent-root-volume": {
3401 "image_id": "ubuntu20.04",
3406 expected_disk_list
= [
3408 "image_id": "ubuntu20.04",
3413 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3414 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3416 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3417 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3418 self
.assertEqual(disk_list
, expected_disk_list
)
3419 self
.assertEqual(len(disk_list
), 1)
3421 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3422 def test_find_persistent_root_volumes_target_vdu_mismatch(
3423 self
, mock_volume_keeping_required
3425 """Find persistent root volume, target vdu name is not matching."""
3426 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3427 vnfd
["vdu"][0]["name"] = "Several_Volumes-VM"
3428 target_vdu
= target_vdu_wth_persistent_storage
3429 vdu_instantiation_volumes_list
= []
3431 result
= self
.ns
.find_persistent_root_volumes(
3432 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3434 self
.assertEqual(result
, None)
3435 mock_volume_keeping_required
.assert_not_called()
3436 self
.assertEqual(disk_list
, [])
3437 self
.assertEqual(len(disk_list
), 0)
3439 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3440 def test_find_persistent_root_volumes_with_instantiation_vol_list(
3441 self
, mock_volume_keeping_required
3443 """Find persistent root volume, existing volume needs to be used."""
3444 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3445 target_vdu
= target_vdu_wth_persistent_storage
3446 vdu_instantiation_volumes_list
= [
3448 "vim-volume-id": vim_volume_id
,
3449 "name": "persistent-root-volume",
3453 expected_persist_root_disk
= {
3454 "persistent-root-volume": {
3455 "vim_volume_id": vim_volume_id
,
3456 "image_id": "ubuntu20.04",
3459 expected_disk_list
= [
3461 "vim_volume_id": vim_volume_id
,
3462 "image_id": "ubuntu20.04",
3465 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3466 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3468 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3469 mock_volume_keeping_required
.assert_not_called()
3470 self
.assertEqual(disk_list
, expected_disk_list
)
3471 self
.assertEqual(len(disk_list
), 1)
3473 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3474 def test_find_persistent_root_volumes_invalid_instantiation_params(
3475 self
, mock_volume_keeping_required
3477 """Find persistent root volume, existing volume id keyword is invalid."""
3478 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3479 target_vdu
= target_vdu_wth_persistent_storage
3480 vdu_instantiation_volumes_list
= [
3482 "volume-id": vim_volume_id
,
3483 "name": "persistent-root-volume",
3487 with self
.assertRaises(KeyError):
3488 self
.ns
.find_persistent_root_volumes(
3489 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3491 mock_volume_keeping_required
.assert_not_called()
3492 self
.assertEqual(disk_list
, [])
3493 self
.assertEqual(len(disk_list
), 0)
3495 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3496 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3497 self
, mock_volume_keeping_required
3499 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3500 persistent_root_disk
= {
3501 "persistent-root-volume": {
3502 "image_id": "ubuntu20.04",
3507 mock_volume_keeping_required
.return_value
= False
3508 target_vdu
= target_vdu_wth_persistent_storage
3509 vdu_instantiation_volumes_list
= []
3512 "image_id": "ubuntu20.04",
3518 "id": "persistent-volume2",
3519 "size-of-storage": "10",
3520 "type-of-storage": "persistent-storage:persistent-storage",
3522 expected_disk_list
= [
3524 "image_id": "ubuntu20.04",
3533 self
.ns
.find_persistent_volumes(
3534 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3536 self
.assertEqual(disk_list
, expected_disk_list
)
3537 mock_volume_keeping_required
.assert_called_once_with(expected_disk
)
3539 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3540 def test_find_persistent_volumes_vdu_wth_inst_vol_list(
3541 self
, mock_volume_keeping_required
3543 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3544 persistent_root_disk
= {
3545 "persistent-root-volume": {
3546 "image_id": "ubuntu20.04",
3551 vdu_instantiation_volumes_list
= [
3553 "vim-volume-id": vim_volume_id
,
3554 "name": "persistent-volume2",
3557 target_vdu
= target_vdu_wth_persistent_storage
3560 "image_id": "ubuntu20.04",
3565 expected_disk_list
= [
3567 "image_id": "ubuntu20.04",
3572 "vim_volume_id": vim_volume_id
,
3575 self
.ns
.find_persistent_volumes(
3576 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3578 self
.assertEqual(disk_list
, expected_disk_list
)
3579 mock_volume_keeping_required
.assert_not_called()
3581 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3582 def test_find_persistent_volumes_vdu_wthout_persistent_storage(
3583 self
, mock_volume_keeping_required
3585 """Find persistent ordinary volume, there is not any persistent disk."""
3586 persistent_root_disk
= {}
3587 vdu_instantiation_volumes_list
= []
3588 mock_volume_keeping_required
.return_value
= False
3589 target_vdu
= target_vdu_wthout_persistent_storage
3591 self
.ns
.find_persistent_volumes(
3592 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3594 self
.assertEqual(disk_list
, disk_list
)
3595 mock_volume_keeping_required
.assert_not_called()
3597 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3598 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3599 self
, mock_volume_keeping_required
3601 """There is persistent root disk, but there is not ordinary persistent disk."""
3602 persistent_root_disk
= {
3603 "persistent-root-volume": {
3604 "image_id": "ubuntu20.04",
3609 vdu_instantiation_volumes_list
= []
3610 mock_volume_keeping_required
.return_value
= False
3611 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3612 target_vdu
["virtual-storages"] = [
3614 "id": "persistent-root-volume",
3615 "size-of-storage": "10",
3616 "type-of-storage": "persistent-storage:persistent-storage",
3617 "vdu-storage-requirements": [
3618 {"key": "keep-volume", "value": "true"},
3622 "id": "ephemeral-volume",
3623 "size-of-storage": "1",
3624 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3629 "image_id": "ubuntu20.04",
3634 self
.ns
.find_persistent_volumes(
3635 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3637 self
.assertEqual(disk_list
, disk_list
)
3638 mock_volume_keeping_required
.assert_not_called()
3640 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3641 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(
3642 self
, mock_volume_keeping_required
3644 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3645 vim-volume-id is given as instantiation parameter but disk id is not matching.
3647 mock_volume_keeping_required
.return_value
= True
3648 vdu_instantiation_volumes_list
= [
3650 "vim-volume-id": vim_volume_id
,
3651 "name": "persistent-volume3",
3654 persistent_root_disk
= {
3655 "persistent-root-volume": {
3656 "image_id": "ubuntu20.04",
3663 "image_id": "ubuntu20.04",
3668 expected_disk_list
= [
3670 "image_id": "ubuntu20.04",
3680 "id": "persistent-volume2",
3681 "size-of-storage": "10",
3682 "type-of-storage": "persistent-storage:persistent-storage",
3684 target_vdu
= target_vdu_wth_persistent_storage
3685 self
.ns
.find_persistent_volumes(
3686 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3688 self
.assertEqual(disk_list
, expected_disk_list
)
3689 mock_volume_keeping_required
.assert_called_once_with(expected_disk
)
3691 def test_is_volume_keeping_required_true(self
):
3692 """Volume keeping is required."""
3693 virtual_storage_descriptor
= {
3694 "id": "persistent-root-volume",
3695 "type-of-storage": "persistent-storage:persistent-storage",
3696 "size-of-storage": "10",
3697 "vdu-storage-requirements": [
3698 {"key": "keep-volume", "value": "true"},
3701 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3702 self
.assertEqual(result
, True)
3704 def test_is_volume_keeping_required_false(self
):
3705 """Volume keeping is not required."""
3706 virtual_storage_descriptor
= {
3707 "id": "persistent-root-volume",
3708 "type-of-storage": "persistent-storage:persistent-storage",
3709 "size-of-storage": "10",
3710 "vdu-storage-requirements": [
3711 {"key": "keep-volume", "value": "false"},
3714 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3715 self
.assertEqual(result
, False)
3717 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self
):
3718 """Volume keeping is not required, vdu-storage-requirements key does not exist."""
3719 virtual_storage_descriptor
= {
3720 "id": "persistent-root-volume",
3721 "type-of-storage": "persistent-storage:persistent-storage",
3722 "size-of-storage": "10",
3724 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3725 self
.assertEqual(result
, False)
3727 def test_is_volume_keeping_required_wrong_keyword(self
):
3728 """vdu-storage-requirements key to indicate keeping-volume is wrong."""
3729 virtual_storage_descriptor
= {
3730 "id": "persistent-root-volume",
3731 "type-of-storage": "persistent-storage:persistent-storage",
3732 "size-of-storage": "10",
3733 "vdu-storage-requirements": [
3734 {"key": "hold-volume", "value": "true"},
3737 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3738 self
.assertEqual(result
, False)
3740 def test_sort_vdu_interfaces_position_all_wth_positions(self
):
3741 """Interfaces are sorted according to position, all have positions."""
3742 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3743 target_vdu
["interfaces"] = [
3746 "ns-vld-id": "datanet",
3751 "ns-vld-id": "mgmtnet",
3755 sorted_interfaces
= [
3758 "ns-vld-id": "mgmtnet",
3763 "ns-vld-id": "datanet",
3767 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3768 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3770 def test_sort_vdu_interfaces_position_some_wth_position(self
):
3771 """Interfaces are sorted according to position, some of them have positions."""
3772 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3773 target_vdu
["interfaces"] = [
3776 "ns-vld-id": "mgmtnet",
3780 "ns-vld-id": "datanet",
3784 sorted_interfaces
= [
3787 "ns-vld-id": "datanet",
3792 "ns-vld-id": "mgmtnet",
3795 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3796 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3798 def test_sort_vdu_interfaces_position_empty_interface_list(self
):
3799 """Interface list is empty."""
3800 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3801 target_vdu
["interfaces"] = []
3802 sorted_interfaces
= []
3803 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3804 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3806 def test_partially_locate_vdu_interfaces(self
):
3807 """Some interfaces have positions."""
3808 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3809 target_vdu
["interfaces"] = [
3812 "ns-vld-id": "net1",
3814 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3817 "ns-vld-id": "mgmtnet",
3821 "ns-vld-id": "datanet",
3825 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3826 self
.assertDictEqual(
3827 target_vdu
["interfaces"][0],
3830 "ns-vld-id": "datanet",
3834 self
.assertDictEqual(
3835 target_vdu
["interfaces"][2],
3836 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3839 def test_partially_locate_vdu_interfaces_position_start_from_0(self
):
3840 """Some interfaces have positions, position start from 0."""
3841 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3842 target_vdu
["interfaces"] = [
3845 "ns-vld-id": "net1",
3847 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3850 "ns-vld-id": "mgmtnet",
3854 "ns-vld-id": "datanet",
3858 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3859 self
.assertDictEqual(
3860 target_vdu
["interfaces"][0],
3863 "ns-vld-id": "datanet",
3867 self
.assertDictEqual(
3868 target_vdu
["interfaces"][3],
3869 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3872 def test_partially_locate_vdu_interfaces_wthout_position(self
):
3873 """Interfaces do not have positions."""
3874 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3875 target_vdu
["interfaces"] = interfaces_wthout_positions
3876 expected_result
= deepcopy(target_vdu
["interfaces"])
3877 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3878 self
.assertEqual(target_vdu
["interfaces"], expected_result
)
3880 def test_partially_locate_vdu_interfaces_all_has_position(self
):
3881 """All interfaces have position."""
3882 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3883 target_vdu
["interfaces"] = interfaces_wth_all_positions
3884 expected_interfaces
= [
3887 "ns-vld-id": "net2",
3892 "ns-vld-id": "mgmtnet",
3897 "ns-vld-id": "net1",
3901 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3902 self
.assertEqual(target_vdu
["interfaces"], expected_interfaces
)
3904 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3905 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3906 def test_prepare_vdu_cloud_init(self
, mock_parse_jinja2
, mock_get_cloud_init
):
3907 """Target_vdu has cloud-init and boot-data-drive."""
3908 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3909 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3910 target_vdu
["boot-data-drive"] = "vda"
3912 mock_get_cloud_init
.return_value
= cloud_init_content
3913 mock_parse_jinja2
.return_value
= user_data
3915 "user-data": user_data
,
3916 "boot-data-drive": "vda",
3918 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3919 self
.assertDictEqual(result
, expected_result
)
3920 mock_get_cloud_init
.assert_called_once_with(
3921 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3923 mock_parse_jinja2
.assert_called_once_with(
3924 cloud_init_content
=cloud_init_content
,
3926 context
="sample-cloud-init-path",
3929 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3930 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3931 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3932 self
, mock_parse_jinja2
, mock_get_cloud_init
3934 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3935 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3936 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3937 target_vdu
["boot-data-drive"] = "vda"
3939 mock_get_cloud_init
.side_effect
= NsException(
3940 "Mismatch descriptor for cloud init."
3943 with self
.assertRaises(NsException
) as err
:
3944 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3945 self
.assertEqual(str(err
.exception
), "Mismatch descriptor for cloud init.")
3947 mock_get_cloud_init
.assert_called_once_with(
3948 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3950 mock_parse_jinja2
.assert_not_called()
3952 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3953 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3954 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3955 self
, mock_parse_jinja2
, mock_get_cloud_init
3957 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3958 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3959 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3960 target_vdu
["boot-data-drive"] = "vda"
3962 mock_get_cloud_init
.return_value
= cloud_init_content
3963 mock_parse_jinja2
.side_effect
= NsException("Error parsing cloud-init content.")
3965 with self
.assertRaises(NsException
) as err
:
3966 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3967 self
.assertEqual(str(err
.exception
), "Error parsing cloud-init content.")
3968 mock_get_cloud_init
.assert_called_once_with(
3969 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3971 mock_parse_jinja2
.assert_called_once_with(
3972 cloud_init_content
=cloud_init_content
,
3974 context
="sample-cloud-init-path",
3977 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3978 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3979 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
3980 self
, mock_parse_jinja2
, mock_get_cloud_init
3982 """Target_vdu has cloud-init but do not have boot-data-drive."""
3983 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3984 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3986 mock_get_cloud_init
.return_value
= cloud_init_content
3987 mock_parse_jinja2
.return_value
= user_data
3989 "user-data": user_data
,
3991 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3992 self
.assertDictEqual(result
, expected_result
)
3993 mock_get_cloud_init
.assert_called_once_with(
3994 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3996 mock_parse_jinja2
.assert_called_once_with(
3997 cloud_init_content
=cloud_init_content
,
3999 context
="sample-cloud-init-path",
4002 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
4003 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
4004 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
4005 self
, mock_parse_jinja2
, mock_get_cloud_init
4007 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
4008 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4009 target_vdu
["cloud-init"] = "sample-cloud-init-path"
4010 target_vdu
["boot-data-drive"] = "vda"
4011 vdu2cloud_init
= {"sample-cloud-init-path": cloud_init_content
}
4012 mock_parse_jinja2
.return_value
= user_data
4014 "user-data": user_data
,
4015 "boot-data-drive": "vda",
4017 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
4018 self
.assertDictEqual(result
, expected_result
)
4019 mock_get_cloud_init
.assert_not_called()
4020 mock_parse_jinja2
.assert_called_once_with(
4021 cloud_init_content
=cloud_init_content
,
4023 context
="sample-cloud-init-path",
4026 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
4027 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
4028 def test_prepare_vdu_cloud_init_no_cloud_init(
4029 self
, mock_parse_jinja2
, mock_get_cloud_init
4031 """Target_vdu do not have cloud-init."""
4032 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4033 target_vdu
["boot-data-drive"] = "vda"
4036 "boot-data-drive": "vda",
4038 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
4039 self
.assertDictEqual(result
, expected_result
)
4040 mock_get_cloud_init
.assert_not_called()
4041 mock_parse_jinja2
.assert_not_called()
4043 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self
):
4044 """ns_vld and vnf_vld both exist."""
4047 "ns-vld-id": "mgmtnet",
4048 "vnf-vld-id": "mgmt_cp_int",
4050 expected_result
= f
"{ns_preffix}:vld.mgmtnet"
4051 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4052 interface
, ns_preffix
, vnf_preffix
4054 self
.assertEqual(result
, expected_result
)
4056 def test_check_vld_information_of_interfaces_empty_interfaces(self
):
4057 """Interface dict is empty."""
4059 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4060 interface
, ns_preffix
, vnf_preffix
4062 self
.assertEqual(result
, "")
4064 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self
):
4065 """Interface dict has only vnf_vld."""
4068 "vnf-vld-id": "mgmt_cp_int",
4070 expected_result
= f
"{vnf_preffix}:vld.mgmt_cp_int"
4071 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4072 interface
, ns_preffix
, vnf_preffix
4074 self
.assertEqual(result
, expected_result
)
4076 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
4079 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
4082 "vnf-vld-id": "mgmt_cp_int",
4085 with self
.assertRaises(Exception) as err
:
4086 self
.ns
._check
_vld
_information
_of
_interfaces
(
4087 interface
, ns_preffix
, vnf_preffix
4089 self
.assertEqual(type(err
), TypeError)
4091 def test_prepare_interface_port_security_has_security_details(self
):
4092 """Interface dict has port security details."""
4095 "ns-vld-id": "mgmtnet",
4096 "vnf-vld-id": "mgmt_cp_int",
4097 "port-security-enabled": True,
4098 "port-security-disable-strategy": "allow-address-pairs",
4100 expected_interface
= {
4102 "ns-vld-id": "mgmtnet",
4103 "vnf-vld-id": "mgmt_cp_int",
4104 "port_security": True,
4105 "port_security_disable_strategy": "allow-address-pairs",
4107 self
.ns
._prepare
_interface
_port
_security
(interface
)
4108 self
.assertDictEqual(interface
, expected_interface
)
4110 def test_prepare_interface_port_security_empty_interfaces(self
):
4111 """Interface dict is empty."""
4113 expected_interface
= {}
4114 self
.ns
._prepare
_interface
_port
_security
(interface
)
4115 self
.assertDictEqual(interface
, expected_interface
)
4117 def test_prepare_interface_port_security_wthout_port_security(self
):
4118 """Interface dict does not have port security details."""
4121 "ns-vld-id": "mgmtnet",
4122 "vnf-vld-id": "mgmt_cp_int",
4124 expected_interface
= {
4126 "ns-vld-id": "mgmtnet",
4127 "vnf-vld-id": "mgmt_cp_int",
4129 self
.ns
._prepare
_interface
_port
_security
(interface
)
4130 self
.assertDictEqual(interface
, expected_interface
)
4132 def test_create_net_item_of_interface_floating_ip_port_security(self
):
4133 """Interface dict has floating ip, port-security details."""
4136 "vcpi": "sample_vcpi",
4137 "port_security": True,
4138 "port_security_disable_strategy": "allow-address-pairs",
4139 "floating_ip": "10.1.1.12",
4140 "ns-vld-id": "mgmtnet",
4141 "vnf-vld-id": "mgmt_cp_int",
4143 net_text
= f
"{ns_preffix}"
4144 expected_net_item
= {
4146 "port_security": True,
4147 "port_security_disable_strategy": "allow-address-pairs",
4148 "floating_ip": "10.1.1.12",
4149 "net_id": f
"TASK-{ns_preffix}",
4152 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4153 self
.assertDictEqual(result
, expected_net_item
)
4155 def test_create_net_item_of_interface_invalid_net_text(self
):
4156 """net-text is invalid."""
4159 "vcpi": "sample_vcpi",
4160 "port_security": True,
4161 "port_security_disable_strategy": "allow-address-pairs",
4162 "floating_ip": "10.1.1.12",
4163 "ns-vld-id": "mgmtnet",
4164 "vnf-vld-id": "mgmt_cp_int",
4167 with self
.assertRaises(TypeError):
4168 self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4170 def test_create_net_item_of_interface_empty_interface(self
):
4171 """Interface dict is empty."""
4173 net_text
= ns_preffix
4174 expected_net_item
= {
4175 "net_id": f
"TASK-{ns_preffix}",
4178 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4179 self
.assertDictEqual(result
, expected_net_item
)
4181 @patch("osm_ng_ro.ns.deep_get")
4182 def test_prepare_type_of_interface_type_sriov(self
, mock_deep_get
):
4183 """Interface type is SR-IOV."""
4186 "vcpi": "sample_vcpi",
4187 "port_security": True,
4188 "port_security_disable_strategy": "allow-address-pairs",
4189 "floating_ip": "10.1.1.12",
4190 "ns-vld-id": "mgmtnet",
4191 "vnf-vld-id": "mgmt_cp_int",
4194 mock_deep_get
.return_value
= "SR-IOV"
4195 net_text
= ns_preffix
4197 expected_net_item
= {
4202 self
.ns
._prepare
_type
_of
_interface
(
4203 interface
, tasks_by_target_record_id
, net_text
, net_item
4205 self
.assertDictEqual(net_item
, expected_net_item
)
4208 tasks_by_target_record_id
[net_text
]["extra_dict"]["params"]["net_type"],
4210 mock_deep_get
.assert_called_once_with(
4211 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4214 @patch("osm_ng_ro.ns.deep_get")
4215 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
4218 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
4221 "vcpi": "sample_vcpi",
4222 "port_security": True,
4223 "port_security_disable_strategy": "allow-address-pairs",
4224 "floating_ip": "10.1.1.12",
4225 "ns-vld-id": "mgmtnet",
4226 "vnf-vld-id": "mgmt_cp_int",
4227 "type": "PCI-PASSTHROUGH",
4229 mock_deep_get
.return_value
= {}
4230 tasks_by_target_record_id
= {}
4231 net_text
= ns_preffix
4233 expected_net_item
= {
4235 "model": "PCI-PASSTHROUGH",
4236 "type": "PCI-PASSTHROUGH",
4238 self
.ns
._prepare
_type
_of
_interface
(
4239 interface
, tasks_by_target_record_id
, net_text
, net_item
4241 self
.assertDictEqual(net_item
, expected_net_item
)
4242 mock_deep_get
.assert_called_once_with(
4243 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4246 @patch("osm_ng_ro.ns.deep_get")
4247 def test_prepare_type_of_interface_type_mgmt(self
, mock_deep_get
):
4248 """Interface type is mgmt."""
4251 "vcpi": "sample_vcpi",
4252 "port_security": True,
4253 "port_security_disable_strategy": "allow-address-pairs",
4254 "floating_ip": "10.1.1.12",
4255 "ns-vld-id": "mgmtnet",
4256 "vnf-vld-id": "mgmt_cp_int",
4259 tasks_by_target_record_id
= {}
4260 net_text
= ns_preffix
4262 expected_net_item
= {
4265 self
.ns
._prepare
_type
_of
_interface
(
4266 interface
, tasks_by_target_record_id
, net_text
, net_item
4268 self
.assertDictEqual(net_item
, expected_net_item
)
4269 mock_deep_get
.assert_not_called()
4271 @patch("osm_ng_ro.ns.deep_get")
4272 def test_prepare_type_of_interface_type_bridge(self
, mock_deep_get
):
4273 """Interface type is bridge."""
4276 "vcpi": "sample_vcpi",
4277 "port_security": True,
4278 "port_security_disable_strategy": "allow-address-pairs",
4279 "floating_ip": "10.1.1.12",
4280 "ns-vld-id": "mgmtnet",
4281 "vnf-vld-id": "mgmt_cp_int",
4283 tasks_by_target_record_id
= {}
4284 net_text
= ns_preffix
4286 expected_net_item
= {
4290 self
.ns
._prepare
_type
_of
_interface
(
4291 interface
, tasks_by_target_record_id
, net_text
, net_item
4293 self
.assertDictEqual(net_item
, expected_net_item
)
4294 mock_deep_get
.assert_not_called()
4296 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4297 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4298 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4299 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4300 def test_prepare_vdu_interfaces(
4302 mock_type_of_interface
,
4303 mock_item_of_interface
,
4305 mock_vld_information_of_interface
,
4307 """Prepare vdu interfaces successfully."""
4308 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4311 "ns-vld-id": "net1",
4312 "ip-address": "13.2.12.31",
4313 "mgmt-interface": True,
4317 "vnf-vld-id": "net2",
4318 "mac-address": "d0:94:66:ed:fc:e2",
4322 "ns-vld-id": "mgmtnet",
4324 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4326 "params": "test_params",
4327 "find_params": "test_find_params",
4331 net_text_1
= f
"{ns_preffix}:net1"
4332 net_text_2
= f
"{vnf_preffix}:net2"
4333 net_text_3
= f
"{ns_preffix}:mgmtnet"
4336 "net_id": f
"TASK-{ns_preffix}",
4341 "net_id": f
"TASK-{ns_preffix}",
4346 "net_id": f
"TASK-{ns_preffix}",
4349 mock_item_of_interface
.side_effect
= [net_item_1
, net_item_2
, net_item_3
]
4350 mock_vld_information_of_interface
.side_effect
= [
4356 expected_extra_dict
= {
4357 "params": "test_params",
4358 "find_params": "test_find_params",
4359 "depends_on": [net_text_1
, net_text_2
, net_text_3
],
4360 "mgmt_vdu_interface": 0,
4362 updated_net_item1
= deepcopy(net_item_1
)
4363 updated_net_item1
.update({"ip_address": "13.2.12.31"})
4364 updated_net_item2
= deepcopy(net_item_2
)
4365 updated_net_item2
.update({"mac_address": "d0:94:66:ed:fc:e2"})
4366 expected_net_list
= [updated_net_item1
, updated_net_item2
, net_item_3
]
4367 self
.ns
._prepare
_vdu
_interfaces
(
4373 tasks_by_target_record_id
,
4376 _call_mock_vld_information_of_interface
= (
4377 mock_vld_information_of_interface
.call_args_list
4380 _call_mock_vld_information_of_interface
[0][0],
4381 (interface_1
, ns_preffix
, vnf_preffix
),
4384 _call_mock_vld_information_of_interface
[1][0],
4385 (interface_2
, ns_preffix
, vnf_preffix
),
4388 _call_mock_vld_information_of_interface
[2][0],
4389 (interface_3
, ns_preffix
, vnf_preffix
),
4392 _call_mock_port_security
= mock_port_security
.call_args_list
4393 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4394 self
.assertEqual(_call_mock_port_security
[1].args
[0], interface_2
)
4395 self
.assertEqual(_call_mock_port_security
[2].args
[0], interface_3
)
4397 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4398 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4399 self
.assertEqual(_call_mock_item_of_interface
[1][0], (interface_2
, net_text_2
))
4400 self
.assertEqual(_call_mock_item_of_interface
[2][0], (interface_3
, net_text_3
))
4402 _call_mock_type_of_interface
= mock_type_of_interface
.call_args_list
4404 _call_mock_type_of_interface
[0][0],
4405 (interface_1
, tasks_by_target_record_id
, net_text_1
, net_item_1
),
4408 _call_mock_type_of_interface
[1][0],
4409 (interface_2
, tasks_by_target_record_id
, net_text_2
, net_item_2
),
4412 _call_mock_type_of_interface
[2][0],
4413 (interface_3
, tasks_by_target_record_id
, net_text_3
, net_item_3
),
4415 self
.assertEqual(net_list
, expected_net_list
)
4416 self
.assertEqual(extra_dict
, expected_extra_dict
)
4417 self
.logger
.error
.assert_not_called()
4419 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4420 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4421 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4422 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4423 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4425 mock_type_of_interface
,
4426 mock_item_of_interface
,
4428 mock_vld_information_of_interface
,
4430 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4431 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4434 "ns-vld-id": "net1",
4435 "ip-address": "13.2.12.31",
4436 "mgmt-interface": True,
4440 "vnf-vld-id": "net2",
4441 "mac-address": "d0:94:66:ed:fc:e2",
4445 "ns-vld-id": "mgmtnet",
4447 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4449 "params": "test_params",
4450 "find_params": "test_find_params",
4453 net_text_1
= f
"{ns_preffix}:net1"
4454 mock_item_of_interface
.side_effect
= [TypeError, TypeError, TypeError]
4456 mock_vld_information_of_interface
.side_effect
= [net_text_1
]
4458 expected_extra_dict
= {
4459 "params": "test_params",
4460 "find_params": "test_find_params",
4461 "depends_on": [net_text_1
],
4463 with self
.assertRaises(TypeError):
4464 self
.ns
._prepare
_vdu
_interfaces
(
4470 tasks_by_target_record_id
,
4474 _call_mock_vld_information_of_interface
= (
4475 mock_vld_information_of_interface
.call_args_list
4478 _call_mock_vld_information_of_interface
[0][0],
4479 (interface_1
, ns_preffix
, vnf_preffix
),
4482 _call_mock_port_security
= mock_port_security
.call_args_list
4483 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4485 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4486 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4488 mock_type_of_interface
.assert_not_called()
4489 self
.logger
.error
.assert_not_called()
4490 self
.assertEqual(net_list
, [])
4491 self
.assertEqual(extra_dict
, expected_extra_dict
)
4493 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4494 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4495 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4496 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4497 def test_prepare_vdu_interfaces_vld_information_is_empty(
4499 mock_type_of_interface
,
4500 mock_item_of_interface
,
4502 mock_vld_information_of_interface
,
4504 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4505 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4508 "ns-vld-id": "net1",
4509 "ip-address": "13.2.12.31",
4510 "mgmt-interface": True,
4514 "vnf-vld-id": "net2",
4515 "mac-address": "d0:94:66:ed:fc:e2",
4519 "ns-vld-id": "mgmtnet",
4521 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4523 "params": "test_params",
4524 "find_params": "test_find_params",
4527 mock_vld_information_of_interface
.side_effect
= ["", "", ""]
4529 self
.ns
._prepare
_vdu
_interfaces
(
4535 tasks_by_target_record_id
,
4539 _call_mock_vld_information_of_interface
= (
4540 mock_vld_information_of_interface
.call_args_list
4543 _call_mock_vld_information_of_interface
[0][0],
4544 (interface_1
, ns_preffix
, vnf_preffix
),
4547 _call_mock_vld_information_of_interface
[1][0],
4548 (interface_2
, ns_preffix
, vnf_preffix
),
4551 _call_mock_vld_information_of_interface
[2][0],
4552 (interface_3
, ns_preffix
, vnf_preffix
),
4555 _call_logger
= self
.logger
.error
.call_args_list
4558 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4562 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4566 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4568 self
.assertEqual(net_list
, [])
4572 "params": "test_params",
4573 "find_params": "test_find_params",
4578 mock_item_of_interface
.assert_not_called()
4579 mock_port_security
.assert_not_called()
4580 mock_type_of_interface
.assert_not_called()
4582 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4583 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4584 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4585 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4586 def test_prepare_vdu_interfaces_empty_interface_list(
4588 mock_type_of_interface
,
4589 mock_item_of_interface
,
4591 mock_vld_information_of_interface
,
4593 """Prepare vdu interfaces, interface list is empty."""
4594 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4595 target_vdu
["interfaces"] = []
4598 self
.ns
._prepare
_vdu
_interfaces
(
4604 tasks_by_target_record_id
,
4607 mock_type_of_interface
.assert_not_called()
4608 mock_vld_information_of_interface
.assert_not_called()
4609 mock_item_of_interface
.assert_not_called()
4610 mock_port_security
.assert_not_called()
4612 def test_prepare_vdu_ssh_keys(self
):
4613 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4614 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4615 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4616 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4617 target_vdu
["ssh-access-required"] = True
4619 expected_cloud_config
= {
4620 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4622 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4623 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4625 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self
):
4626 """Target_vdu does not have ssh-keys."""
4627 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4628 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4629 target_vdu
["ssh-access-required"] = True
4631 expected_cloud_config
= {"key-pairs": [{"public_key": "path_of_public_key"}]}
4632 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4633 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4635 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self
):
4636 """Target_vdu has ssh-keys, ssh-access is not required."""
4637 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4638 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4639 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4640 target_vdu
["ssh-access-required"] = False
4642 expected_cloud_config
= {"key-pairs": ["sample-ssh-key"]}
4643 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4644 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4646 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4647 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4648 def test_add_persistent_root_disk_to_disk_list_keep_false(
4649 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4651 """Add persistent root disk to disk_list, keep volume set to False."""
4653 "id": "persistent-root-volume",
4654 "type-of-storage": "persistent-storage:persistent-storage",
4655 "size-of-storage": "10",
4657 mock_select_persistent_root_disk
.return_value
= root_disk
4658 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4659 vnfd
["virtual-storage-desc"][1] = root_disk
4660 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4661 persistent_root_disk
= {}
4663 mock_volume_keeping_required
.return_value
= False
4664 expected_disk_list
= [
4666 "image_id": "ubuntu20.04",
4671 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4672 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4674 self
.assertEqual(disk_list
, expected_disk_list
)
4675 mock_select_persistent_root_disk
.assert_called_once()
4676 mock_volume_keeping_required
.assert_called_once()
4678 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4679 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4680 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4681 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4683 """Add persistent root disk to disk_list"""
4685 "id": "persistent-root-volume",
4686 "type-of-storage": "persistent-storage:persistent-storage",
4687 "size-of-storage": "10",
4689 mock_select_persistent_root_disk
.side_effect
= AttributeError
4690 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4691 vnfd
["virtual-storage-desc"][1] = root_disk
4692 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4693 persistent_root_disk
= {}
4695 with self
.assertRaises(AttributeError):
4696 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4697 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4699 self
.assertEqual(disk_list
, [])
4700 mock_select_persistent_root_disk
.assert_called_once()
4701 mock_volume_keeping_required
.assert_not_called()
4703 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4704 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4705 def test_add_persistent_root_disk_to_disk_list_keep_true(
4706 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4708 """Add persistent root disk, keeo volume set to True."""
4709 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4710 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4711 mock_volume_keeping_required
.return_value
= True
4713 "id": "persistent-root-volume",
4714 "type-of-storage": "persistent-storage:persistent-storage",
4715 "size-of-storage": "10",
4716 "vdu-storage-requirements": [
4717 {"key": "keep-volume", "value": "true"},
4720 mock_select_persistent_root_disk
.return_value
= root_disk
4721 persistent_root_disk
= {}
4723 expected_disk_list
= [
4725 "image_id": "ubuntu20.04",
4730 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4731 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4733 self
.assertEqual(disk_list
, expected_disk_list
)
4734 mock_volume_keeping_required
.assert_called_once_with(root_disk
)
4736 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4737 def test_add_persistent_ordinary_disk_to_disk_list(
4738 self
, mock_volume_keeping_required
4740 """Add persistent ordinary disk, keeo volume set to True."""
4741 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4742 mock_volume_keeping_required
.return_value
= False
4743 persistent_root_disk
= {
4744 "persistent-root-volume": {
4745 "image_id": "ubuntu20.04",
4751 "id": "persistent-volume2",
4752 "type-of-storage": "persistent-storage:persistent-storage",
4753 "size-of-storage": "10",
4755 persistent_ordinary_disk
= {}
4757 expected_disk_list
= [
4763 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4764 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4766 self
.assertEqual(disk_list
, expected_disk_list
)
4767 mock_volume_keeping_required
.assert_called_once_with(ordinary_disk
)
4769 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4770 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(
4771 self
, mock_volume_keeping_required
4773 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4774 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4775 mock_volume_keeping_required
.return_value
= False
4776 persistent_root_disk
= {
4777 "persistent-root-volume": {
4778 "image_id": "ubuntu20.04",
4782 "persistent-volume2": {
4786 persistent_ordinary_disk
= {}
4789 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4790 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4792 self
.assertEqual(disk_list
, [])
4793 mock_volume_keeping_required
.assert_not_called()
4795 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4796 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4797 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4798 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4800 """VNFD does not have persistent storage."""
4801 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4802 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4803 mock_select_persistent_root_disk
.return_value
= None
4804 persistent_root_disk
= {}
4806 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4807 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4809 self
.assertEqual(disk_list
, [])
4810 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4811 mock_volume_keeping_required
.assert_not_called()
4813 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4814 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4815 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4816 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4818 """Persistent_root_disk dict is empty."""
4819 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4820 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4821 mock_select_persistent_root_disk
.return_value
= None
4822 persistent_root_disk
= {}
4824 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4825 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4827 self
.assertEqual(disk_list
, [])
4828 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4829 mock_volume_keeping_required
.assert_not_called()
4831 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self
):
4832 """Invalid extra dict."""
4833 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4834 target_vdu
["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4836 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4837 with self
.assertRaises(NsException
) as err
:
4838 self
.ns
._prepare
_vdu
_affinity
_group
_list
(target_vdu
, extra_dict
, ns_preffix
)
4839 self
.assertEqual(str(err
.exception
), "Invalid extra_dict format.")
4841 def test_prepare_vdu_affinity_group_list_one_affinity_group(self
):
4842 """There is one affinity-group."""
4843 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4844 target_vdu
["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4845 extra_dict
= {"depends_on": []}
4846 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4847 affinity_group_txt
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4848 expected_result
= [{"affinity_group_id": "TASK-" + affinity_group_txt
}]
4849 expected_extra_dict
= {"depends_on": [affinity_group_txt
]}
4850 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4851 target_vdu
, extra_dict
, ns_preffix
4853 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4854 self
.assertEqual(result
, expected_result
)
4856 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self
):
4857 """There are two affinity-groups."""
4858 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4859 target_vdu
["affinity-or-anti-affinity-group-id"] = [
4860 "affinity-group-id1",
4861 "affinity-group-id2",
4863 extra_dict
= {"depends_on": []}
4864 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4865 affinity_group_txt1
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4866 affinity_group_txt2
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4868 {"affinity_group_id": "TASK-" + affinity_group_txt1
},
4869 {"affinity_group_id": "TASK-" + affinity_group_txt2
},
4871 expected_extra_dict
= {"depends_on": [affinity_group_txt1
, affinity_group_txt2
]}
4872 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4873 target_vdu
, extra_dict
, ns_preffix
4875 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4876 self
.assertEqual(result
, expected_result
)
4878 def test_prepare_vdu_affinity_group_list_no_affinity_group(self
):
4879 """There is not any affinity-group."""
4880 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4881 extra_dict
= {"depends_on": []}
4882 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4883 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4884 target_vdu
, extra_dict
, ns_preffix
4886 self
.assertDictEqual(extra_dict
, {"depends_on": []})
4887 self
.assertEqual(result
, [])
4889 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4890 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4891 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4892 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4893 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4894 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4895 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4896 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4897 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4898 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4899 def test_process_vdu_params_with_inst_vol_list(
4901 mock_prepare_vdu_affinity_group_list
,
4902 mock_add_persistent_ordinary_disks_to_disk_list
,
4903 mock_add_persistent_root_disk_to_disk_list
,
4904 mock_find_persistent_volumes
,
4905 mock_find_persistent_root_volumes
,
4906 mock_prepare_vdu_ssh_keys
,
4907 mock_prepare_vdu_cloud_init
,
4908 mock_prepare_vdu_interfaces
,
4909 mock_locate_vdu_interfaces
,
4910 mock_sort_vdu_interfaces
,
4912 """Instantiation volume list is empty."""
4913 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4915 target_vdu
["interfaces"] = interfaces_wth_all_positions
4917 vdu_instantiation_vol_list
= [
4919 "vim-volume-id": vim_volume_id
,
4920 "name": "persistent-volume2",
4923 target_vdu
["additionalParams"] = {
4924 "OSM": {"vdu_volumes": vdu_instantiation_vol_list
}
4926 mock_prepare_vdu_cloud_init
.return_value
= {}
4927 mock_prepare_vdu_affinity_group_list
.return_value
= []
4928 persistent_root_disk
= {
4929 "persistent-root-volume": {
4930 "image_id": "ubuntu20.04",
4934 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
4936 new_kwargs
= deepcopy(kwargs
)
4941 "tasks_by_target_record_id": {},
4945 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
4946 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4947 db
.get_one
.return_value
= vnfd
4948 result
= Ns
._process
_vdu
_params
(
4949 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4951 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4952 mock_locate_vdu_interfaces
.assert_not_called()
4953 mock_prepare_vdu_cloud_init
.assert_called_once()
4954 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
4955 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
4956 mock_prepare_vdu_interfaces
.assert_called_once_with(
4958 expected_extra_dict_copy
,
4965 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4966 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4967 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4968 mock_find_persistent_volumes
.assert_called_once_with(
4969 persistent_root_disk
, target_vdu
, vdu_instantiation_vol_list
, []
4972 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4973 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4974 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4975 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4976 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4977 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4978 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4979 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4980 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4981 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4982 def test_process_vdu_params_wth_affinity_groups(
4984 mock_prepare_vdu_affinity_group_list
,
4985 mock_add_persistent_ordinary_disks_to_disk_list
,
4986 mock_add_persistent_root_disk_to_disk_list
,
4987 mock_find_persistent_volumes
,
4988 mock_find_persistent_root_volumes
,
4989 mock_prepare_vdu_ssh_keys
,
4990 mock_prepare_vdu_cloud_init
,
4991 mock_prepare_vdu_interfaces
,
4992 mock_locate_vdu_interfaces
,
4993 mock_sort_vdu_interfaces
,
4995 """There is cloud-config."""
4996 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4999 target_vdu
["interfaces"] = interfaces_wth_all_positions
5000 mock_prepare_vdu_cloud_init
.return_value
= {}
5001 mock_prepare_vdu_affinity_group_list
.return_value
= [
5006 new_kwargs
= deepcopy(kwargs
)
5011 "tasks_by_target_record_id": {},
5015 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
5016 expected_extra_dict3
["params"]["affinity_group_list"] = [
5020 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5021 db
.get_one
.return_value
= vnfd
5022 result
= Ns
._process
_vdu
_params
(
5023 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5025 self
.assertDictEqual(result
, expected_extra_dict3
)
5026 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5027 mock_locate_vdu_interfaces
.assert_not_called()
5028 mock_prepare_vdu_cloud_init
.assert_called_once()
5029 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5030 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5031 mock_prepare_vdu_interfaces
.assert_called_once_with(
5033 expected_extra_dict3
,
5041 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5042 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5043 mock_find_persistent_volumes
.assert_not_called()
5045 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5046 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5047 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5048 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5049 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5050 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5051 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5052 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5053 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5054 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5055 def test_process_vdu_params_wth_cloud_config(
5057 mock_prepare_vdu_affinity_group_list
,
5058 mock_add_persistent_ordinary_disks_to_disk_list
,
5059 mock_add_persistent_root_disk_to_disk_list
,
5060 mock_find_persistent_volumes
,
5061 mock_find_persistent_root_volumes
,
5062 mock_prepare_vdu_ssh_keys
,
5063 mock_prepare_vdu_cloud_init
,
5064 mock_prepare_vdu_interfaces
,
5065 mock_locate_vdu_interfaces
,
5066 mock_sort_vdu_interfaces
,
5068 """There is cloud-config."""
5069 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5072 target_vdu
["interfaces"] = interfaces_wth_all_positions
5073 mock_prepare_vdu_cloud_init
.return_value
= {
5074 "user-data": user_data
,
5075 "boot-data-drive": "vda",
5077 mock_prepare_vdu_affinity_group_list
.return_value
= []
5079 new_kwargs
= deepcopy(kwargs
)
5084 "tasks_by_target_record_id": {},
5088 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
5089 expected_extra_dict3
["params"]["cloud_config"] = {
5090 "user-data": user_data
,
5091 "boot-data-drive": "vda",
5093 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5094 db
.get_one
.return_value
= vnfd
5095 result
= Ns
._process
_vdu
_params
(
5096 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5098 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5099 mock_locate_vdu_interfaces
.assert_not_called()
5100 mock_prepare_vdu_cloud_init
.assert_called_once()
5101 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5102 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5103 mock_prepare_vdu_interfaces
.assert_called_once_with(
5105 expected_extra_dict3
,
5112 self
.assertDictEqual(result
, expected_extra_dict3
)
5113 mock_prepare_vdu_ssh_keys
.assert_called_once_with(
5114 target_vdu
, None, {"user-data": user_data
, "boot-data-drive": "vda"}
5116 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5117 mock_find_persistent_volumes
.assert_not_called()
5119 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5120 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5121 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5122 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5123 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5124 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5125 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5126 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5127 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5128 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5129 def test_process_vdu_params_wthout_persistent_storage(
5131 mock_prepare_vdu_affinity_group_list
,
5132 mock_add_persistent_ordinary_disks_to_disk_list
,
5133 mock_add_persistent_root_disk_to_disk_list
,
5134 mock_find_persistent_volumes
,
5135 mock_find_persistent_root_volumes
,
5136 mock_prepare_vdu_ssh_keys
,
5137 mock_prepare_vdu_cloud_init
,
5138 mock_prepare_vdu_interfaces
,
5139 mock_locate_vdu_interfaces
,
5140 mock_sort_vdu_interfaces
,
5142 """There is not any persistent storage."""
5143 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5146 target_vdu
["interfaces"] = interfaces_wth_all_positions
5147 mock_prepare_vdu_cloud_init
.return_value
= {}
5148 mock_prepare_vdu_affinity_group_list
.return_value
= []
5150 new_kwargs
= deepcopy(kwargs
)
5155 "tasks_by_target_record_id": {},
5159 expected_extra_dict_copy
= deepcopy(expected_extra_dict2
)
5160 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
5161 db
.get_one
.return_value
= vnfd
5162 result
= Ns
._process
_vdu
_params
(
5163 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5165 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5166 mock_locate_vdu_interfaces
.assert_not_called()
5167 mock_prepare_vdu_cloud_init
.assert_called_once()
5168 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5169 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5170 mock_prepare_vdu_interfaces
.assert_called_once_with(
5172 expected_extra_dict_copy
,
5179 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5180 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5181 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5182 mock_find_persistent_volumes
.assert_not_called()
5184 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5185 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5186 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5187 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5188 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5189 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5190 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5191 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5192 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5193 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5194 def test_process_vdu_params_interfaces_partially_located(
5196 mock_prepare_vdu_affinity_group_list
,
5197 mock_add_persistent_ordinary_disks_to_disk_list
,
5198 mock_add_persistent_root_disk_to_disk_list
,
5199 mock_find_persistent_volumes
,
5200 mock_find_persistent_root_volumes
,
5201 mock_prepare_vdu_ssh_keys
,
5202 mock_prepare_vdu_cloud_init
,
5203 mock_prepare_vdu_interfaces
,
5204 mock_locate_vdu_interfaces
,
5205 mock_sort_vdu_interfaces
,
5207 """Some interfaces have position."""
5208 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5211 target_vdu
["interfaces"] = [
5214 "ns-vld-id": "net1",
5216 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
5219 "ns-vld-id": "mgmtnet",
5222 mock_prepare_vdu_cloud_init
.return_value
= {}
5223 mock_prepare_vdu_affinity_group_list
.return_value
= []
5224 persistent_root_disk
= {
5225 "persistent-root-volume": {
5226 "image_id": "ubuntu20.04",
5231 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5233 new_kwargs
= deepcopy(kwargs
)
5238 "tasks_by_target_record_id": {},
5243 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5244 db
.get_one
.return_value
= vnfd
5245 result
= Ns
._process
_vdu
_params
(
5246 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5248 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5249 mock_sort_vdu_interfaces
.assert_not_called()
5250 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5251 mock_prepare_vdu_cloud_init
.assert_called_once()
5252 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5253 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5254 mock_prepare_vdu_interfaces
.assert_called_once_with(
5256 expected_extra_dict_copy
,
5263 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5264 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5265 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5266 mock_find_persistent_volumes
.assert_not_called()
5267 mock_find_persistent_root_volumes
.assert_not_called()
5269 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5270 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5271 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5272 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5273 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5274 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5275 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5276 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5277 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5278 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5279 def test_process_vdu_params_no_interface_position(
5281 mock_prepare_vdu_affinity_group_list
,
5282 mock_add_persistent_ordinary_disks_to_disk_list
,
5283 mock_add_persistent_root_disk_to_disk_list
,
5284 mock_find_persistent_volumes
,
5285 mock_find_persistent_root_volumes
,
5286 mock_prepare_vdu_ssh_keys
,
5287 mock_prepare_vdu_cloud_init
,
5288 mock_prepare_vdu_interfaces
,
5289 mock_locate_vdu_interfaces
,
5290 mock_sort_vdu_interfaces
,
5292 """Interfaces do not have position."""
5293 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5296 target_vdu
["interfaces"] = interfaces_wthout_positions
5297 mock_prepare_vdu_cloud_init
.return_value
= {}
5298 mock_prepare_vdu_affinity_group_list
.return_value
= []
5299 persistent_root_disk
= {
5300 "persistent-root-volume": {
5301 "image_id": "ubuntu20.04",
5306 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5307 new_kwargs
= deepcopy(kwargs
)
5312 "tasks_by_target_record_id": {},
5317 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5318 db
.get_one
.return_value
= vnfd
5319 result
= Ns
._process
_vdu
_params
(
5320 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5322 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5323 mock_sort_vdu_interfaces
.assert_not_called()
5324 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5325 mock_prepare_vdu_cloud_init
.assert_called_once()
5326 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5327 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5328 mock_prepare_vdu_interfaces
.assert_called_once_with(
5330 expected_extra_dict_copy
,
5337 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5338 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5339 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5340 mock_find_persistent_volumes
.assert_not_called()
5341 mock_find_persistent_root_volumes
.assert_not_called()
5343 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5344 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5345 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5346 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5347 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5348 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5349 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5350 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5351 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5352 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5353 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5355 mock_prepare_vdu_affinity_group_list
,
5356 mock_add_persistent_ordinary_disks_to_disk_list
,
5357 mock_add_persistent_root_disk_to_disk_list
,
5358 mock_find_persistent_volumes
,
5359 mock_find_persistent_root_volumes
,
5360 mock_prepare_vdu_ssh_keys
,
5361 mock_prepare_vdu_cloud_init
,
5362 mock_prepare_vdu_interfaces
,
5363 mock_locate_vdu_interfaces
,
5364 mock_sort_vdu_interfaces
,
5366 """Prepare vdu interfaces method raises exception."""
5367 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5370 target_vdu
["interfaces"] = interfaces_wthout_positions
5371 mock_prepare_vdu_cloud_init
.return_value
= {}
5372 mock_prepare_vdu_affinity_group_list
.return_value
= []
5373 persistent_root_disk
= {
5374 "persistent-root-volume": {
5375 "image_id": "ubuntu20.04",
5380 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5381 new_kwargs
= deepcopy(kwargs
)
5386 "tasks_by_target_record_id": {},
5390 mock_prepare_vdu_interfaces
.side_effect
= TypeError
5392 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5393 db
.get_one
.return_value
= vnfd
5394 with self
.assertRaises(Exception) as err
:
5395 Ns
._process
_vdu
_params
(
5396 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5398 self
.assertEqual(type(err
), TypeError)
5399 mock_sort_vdu_interfaces
.assert_not_called()
5400 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5401 mock_prepare_vdu_cloud_init
.assert_not_called()
5402 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
5403 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5404 mock_prepare_vdu_interfaces
.assert_called_once()
5405 mock_prepare_vdu_ssh_keys
.assert_not_called()
5406 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5407 mock_find_persistent_volumes
.assert_not_called()
5408 mock_find_persistent_root_volumes
.assert_not_called()
5410 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5411 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5412 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5413 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5414 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5415 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5416 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5417 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5418 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5419 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5420 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5422 mock_prepare_vdu_affinity_group_list
,
5423 mock_add_persistent_ordinary_disks_to_disk_list
,
5424 mock_add_persistent_root_disk_to_disk_list
,
5425 mock_find_persistent_volumes
,
5426 mock_find_persistent_root_volumes
,
5427 mock_prepare_vdu_ssh_keys
,
5428 mock_prepare_vdu_cloud_init
,
5429 mock_prepare_vdu_interfaces
,
5430 mock_locate_vdu_interfaces
,
5431 mock_sort_vdu_interfaces
,
5433 """Add persistent root disk method raises exception."""
5434 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5437 target_vdu
["interfaces"] = interfaces_wthout_positions
5438 mock_prepare_vdu_cloud_init
.return_value
= {}
5439 mock_prepare_vdu_affinity_group_list
.return_value
= []
5440 mock_add_persistent_root_disk_to_disk_list
.side_effect
= KeyError
5441 new_kwargs
= deepcopy(kwargs
)
5446 "tasks_by_target_record_id": {},
5451 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5452 db
.get_one
.return_value
= vnfd
5453 with self
.assertRaises(Exception) as err
:
5454 Ns
._process
_vdu
_params
(
5455 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5457 self
.assertEqual(type(err
), KeyError)
5458 mock_sort_vdu_interfaces
.assert_not_called()
5459 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5460 mock_prepare_vdu_cloud_init
.assert_called_once()
5461 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5462 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5463 mock_prepare_vdu_interfaces
.assert_called_once_with(
5467 f
"{ns_preffix}:image.0",
5468 f
"{ns_preffix}:flavor.0",
5478 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5479 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5480 mock_find_persistent_volumes
.assert_not_called()
5481 mock_find_persistent_root_volumes
.assert_not_called()
5483 def test_select_persistent_root_disk(self
):
5484 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5485 vdu
["virtual-storage-desc"] = [
5486 "persistent-root-volume",
5487 "persistent-volume2",
5490 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5491 expected_result
= vsd
5492 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5493 self
.assertEqual(result
, expected_result
)
5495 def test_select_persistent_root_disk_first_vsd_is_different(self
):
5496 """VDU first virtual-storage-desc is different than vsd id."""
5497 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5498 vdu
["virtual-storage-desc"] = [
5499 "persistent-volume2",
5500 "persistent-root-volume",
5503 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5504 expected_result
= None
5505 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5506 self
.assertEqual(result
, expected_result
)
5508 def test_select_persistent_root_disk_vsd_is_not_persistent(self
):
5509 """vsd type is not persistent."""
5510 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5511 vdu
["virtual-storage-desc"] = [
5512 "persistent-volume2",
5513 "persistent-root-volume",
5516 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5517 vsd
["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5518 expected_result
= None
5519 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5520 self
.assertEqual(result
, expected_result
)
5522 def test_select_persistent_root_disk_vsd_does_not_have_size(self
):
5523 """vsd size is None."""
5524 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5525 vdu
["virtual-storage-desc"] = [
5526 "persistent-volume2",
5527 "persistent-root-volume",
5530 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5531 vsd
["size-of-storage"] = None
5532 expected_result
= None
5533 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5534 self
.assertEqual(result
, expected_result
)
5536 def test_select_persistent_root_disk_vdu_wthout_vsd(self
):
5537 """VDU does not have virtual-storage-desc."""
5538 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5539 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5540 expected_result
= None
5541 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5542 self
.assertEqual(result
, expected_result
)
5544 def test_select_persistent_root_disk_invalid_vsd_type(self
):
5545 """vsd is list, expected to be a dict."""
5546 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5547 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"]
5548 with self
.assertRaises(AttributeError):
5549 Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)