1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy
import deepcopy
19 from unittest
.mock
import MagicMock
, Mock
, patch
29 from osm_ng_ro
.ns
import Ns
, NsException
32 __author__
= "Eduardo Sousa"
33 __date__
= "$19-NOV-2021 00:00:00$"
36 # Variables used in Tests
37 vnfd_wth_persistent_storage
= {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
56 "virtual-storage-desc": [
57 "persistent-root-volume",
64 "virtual-storage-desc": [
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
74 "vdu-storage-requirements": [
75 {"key": "keep-volume", "value": "true"},
79 "id": "ephemeral-volume",
80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
81 "size-of-storage": "1",
87 "path": "/app/storage/",
92 vim_volume_id
= "ru937f49-3870-4169-b758-9732e1ff40f3"
93 task_by_target_record_id
= {
94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
95 "extra_dict": {"params": {"net_type": "SR-IOV"}}
98 interfaces_wthout_positions
= [
109 "ns-vld-id": "mgmtnet",
112 interfaces_wth_all_positions
= [
125 "ns-vld-id": "mgmtnet",
129 target_vdu_wth_persistent_storage
= {
130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
133 "vdu-name": "several_volumes-VM",
137 "ns-vld-id": "mgmtnet",
140 "virtual-storages": [
142 "id": "persistent-volume2",
143 "size-of-storage": "10",
144 "type-of-storage": "persistent-storage:persistent-storage",
147 "id": "persistent-root-volume",
148 "size-of-storage": "10",
149 "type-of-storage": "persistent-storage:persistent-storage",
150 "vdu-storage-requirements": [
151 {"key": "keep-volume", "value": "true"},
155 "id": "ephemeral-volume",
156 "size-of-storage": "1",
157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
161 db
= MagicMock(name
="database mock")
162 fs
= MagicMock(name
="database mock")
163 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
164 vnf_preffix
= "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
165 vnfr_id
= "wh47f48-y870-4169-b758-5732e1ff40f5"
166 nsr_id
= "th47f48-9870-4169-b758-9732e1ff40f3"
168 "name": "sample_name",
170 expected_extra_dict
= {
172 f
"{ns_preffix}:image.0",
173 f
"{ns_preffix}:flavor.0",
176 "affinity_group_list": [],
177 "availability_zone_index": None,
178 "availability_zone_list": None,
179 "cloud_config": None,
180 "description": "several_volumes-VM",
182 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
183 "image_id": f
"TASK-{ns_preffix}:image.0",
184 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
190 expected_extra_dict2
= {
192 f
"{ns_preffix}:image.0",
193 f
"{ns_preffix}:flavor.0",
196 "affinity_group_list": [],
197 "availability_zone_index": None,
198 "availability_zone_list": None,
199 "cloud_config": None,
200 "description": "without_volumes-VM",
202 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
203 "image_id": f
"TASK-{ns_preffix}:image.0",
204 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
209 tasks_by_target_record_id
= {
210 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
213 "net_type": "SR-IOV",
220 "vdu2cloud_init": {},
222 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
223 "member-vnf-index-ref": "vnf-several-volumes",
226 vnfd_wthout_persistent_storage
= {
227 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
231 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
234 "id": "without_volumes-vnf",
235 "product-name": "without_volumes-vnf",
238 "id": "without_volumes-VM",
239 "name": "without_volumes-VM",
240 "sw-image-desc": "ubuntu20.04",
241 "alternative-sw-image-desc": [
245 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
249 "virtual-storage-desc": [
250 {"id": "root-volume", "size-of-storage": "10"},
252 "id": "ephemeral-volume",
253 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
254 "size-of-storage": "1",
260 "path": "/app/storage/",
266 target_vdu_wthout_persistent_storage
= {
267 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
270 "vdu-name": "without_volumes-VM",
274 "ns-vld-id": "mgmtnet",
277 "virtual-storages": [
280 "size-of-storage": "10",
283 "id": "ephemeral-volume",
284 "size-of-storage": "1",
285 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
289 cloud_init_content
= """
294 overwrite: {{is_override}}
297 - [ sh, -xc, "echo $(date) '{{command}}'" ]
308 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
312 class CopyingMock(MagicMock
):
313 def __call__(self
, *args
, **kwargs
):
314 args
= deepcopy(args
)
315 kwargs
= deepcopy(kwargs
)
316 return super(CopyingMock
, self
).__call
__(*args
, **kwargs
)
319 class TestNs(unittest
.TestCase
):
323 def test__create_task_without_extra_dict(self
):
325 "target_id": "vim_openstack_1",
326 "action_id": "123456",
328 "task_id": "123456:1",
329 "status": "SCHEDULED",
332 "target_record": "test_target_record",
333 "target_record_id": "test_target_record_id",
336 "action_id": "123456",
341 task
= Ns
._create
_task
(
342 deployment_info
=deployment_info
,
343 target_id
="vim_openstack_1",
346 target_record
="test_target_record",
347 target_record_id
="test_target_record_id",
350 self
.assertEqual(deployment_info
.get("task_index"), 2)
351 self
.assertDictEqual(task
, expected_result
)
353 def test__create_task(self
):
355 "target_id": "vim_openstack_1",
356 "action_id": "123456",
358 "task_id": "123456:1",
359 "status": "SCHEDULED",
362 "target_record": "test_target_record",
363 "target_record_id": "test_target_record_id",
364 # values coming from extra_dict
365 "params": "test_params",
366 "find_params": "test_find_params",
367 "depends_on": "test_depends_on",
370 "action_id": "123456",
375 task
= Ns
._create
_task
(
376 deployment_info
=deployment_info
,
377 target_id
="vim_openstack_1",
380 target_record
="test_target_record",
381 target_record_id
="test_target_record_id",
383 "params": "test_params",
384 "find_params": "test_find_params",
385 "depends_on": "test_depends_on",
389 self
.assertEqual(deployment_info
.get("task_index"), 2)
390 self
.assertDictEqual(task
, expected_result
)
392 @patch("osm_ng_ro.ns.time")
393 def test__create_ro_task(self
, mock_time
: Mock
):
394 now
= 1637324838.994551
395 mock_time
.return_value
= now
397 "target_id": "vim_openstack_1",
398 "action_id": "123456",
400 "task_id": "123456:1",
401 "status": "SCHEDULED",
404 "target_record": "test_target_record",
405 "target_record_id": "test_target_record_id",
406 # values coming from extra_dict
407 "params": "test_params",
408 "find_params": "test_find_params",
409 "depends_on": "test_depends_on",
415 "target_id": "vim_openstack_1",
418 "created_items": None,
432 ro_task
= Ns
._create
_ro
_task
(
433 target_id
="vim_openstack_1",
437 self
.assertDictEqual(ro_task
, expected_result
)
439 def test__process_image_params_with_empty_target_image(self
):
445 result
= Ns
._process
_image
_params
(
446 target_image
=target_image
,
449 target_record_id
=None,
452 self
.assertDictEqual(expected_result
, result
)
454 def test__process_image_params_with_wrong_target_image(self
):
459 "no_image": "to_see_here",
462 result
= Ns
._process
_image
_params
(
463 target_image
=target_image
,
466 target_record_id
=None,
469 self
.assertDictEqual(expected_result
, result
)
471 def test__process_image_params_with_image(self
):
483 result
= Ns
._process
_image
_params
(
484 target_image
=target_image
,
487 target_record_id
=None,
490 self
.assertDictEqual(expected_result
, result
)
492 def test__process_image_params_with_vim_image_id(self
):
501 "vim_image_id": "123456",
504 result
= Ns
._process
_image
_params
(
505 target_image
=target_image
,
508 target_record_id
=None,
511 self
.assertDictEqual(expected_result
, result
)
513 def test__process_image_params_with_image_checksum(self
):
517 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
522 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
525 result
= Ns
._process
_image
_params
(
526 target_image
=target_image
,
529 target_record_id
=None,
532 self
.assertDictEqual(expected_result
, result
)
534 def test__get_resource_allocation_params_with_empty_target_image(self
):
536 quota_descriptor
= {}
538 result
= Ns
._get
_resource
_allocation
_params
(
539 quota_descriptor
=quota_descriptor
,
542 self
.assertDictEqual(expected_result
, result
)
544 def test__get_resource_allocation_params_with_wrong_target_image(self
):
547 "no_quota": "present_here",
550 result
= Ns
._get
_resource
_allocation
_params
(
551 quota_descriptor
=quota_descriptor
,
554 self
.assertDictEqual(expected_result
, result
)
556 def test__get_resource_allocation_params_with_limit(self
):
564 result
= Ns
._get
_resource
_allocation
_params
(
565 quota_descriptor
=quota_descriptor
,
568 self
.assertDictEqual(expected_result
, result
)
570 def test__get_resource_allocation_params_with_reserve(self
):
578 result
= Ns
._get
_resource
_allocation
_params
(
579 quota_descriptor
=quota_descriptor
,
582 self
.assertDictEqual(expected_result
, result
)
584 def test__get_resource_allocation_params_with_shares(self
):
592 result
= Ns
._get
_resource
_allocation
_params
(
593 quota_descriptor
=quota_descriptor
,
596 self
.assertDictEqual(expected_result
, result
)
598 def test__get_resource_allocation_params(self
):
610 result
= Ns
._get
_resource
_allocation
_params
(
611 quota_descriptor
=quota_descriptor
,
614 self
.assertDictEqual(expected_result
, result
)
616 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
617 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
625 result
= Ns
._process
_guest
_epa
_quota
_params
(
626 guest_epa_quota
=guest_epa_quota
,
627 epa_vcpu_set
=epa_vcpu_set
,
630 self
.assertDictEqual(expected_result
, result
)
631 self
.assertFalse(resource_allocation
.called
)
633 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
634 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
642 result
= Ns
._process
_guest
_epa
_quota
_params
(
643 guest_epa_quota
=guest_epa_quota
,
644 epa_vcpu_set
=epa_vcpu_set
,
647 self
.assertDictEqual(expected_result
, result
)
648 self
.assertFalse(resource_allocation
.called
)
650 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
651 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
657 "no-quota": "nothing",
661 result
= Ns
._process
_guest
_epa
_quota
_params
(
662 guest_epa_quota
=guest_epa_quota
,
663 epa_vcpu_set
=epa_vcpu_set
,
666 self
.assertDictEqual(expected_result
, result
)
667 self
.assertFalse(resource_allocation
.called
)
669 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
670 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
676 "no-quota": "nothing",
680 result
= Ns
._process
_guest
_epa
_quota
_params
(
681 guest_epa_quota
=guest_epa_quota
,
682 epa_vcpu_set
=epa_vcpu_set
,
685 self
.assertDictEqual(expected_result
, result
)
686 self
.assertFalse(resource_allocation
.called
)
688 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
689 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
703 result
= Ns
._process
_guest
_epa
_quota
_params
(
704 guest_epa_quota
=guest_epa_quota
,
705 epa_vcpu_set
=epa_vcpu_set
,
708 self
.assertDictEqual(expected_result
, result
)
709 self
.assertFalse(resource_allocation
.called
)
711 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
712 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
732 resource_allocation_param
= {
737 resource_allocation
.return_value
= {
743 result
= Ns
._process
_guest
_epa
_quota
_params
(
744 guest_epa_quota
=guest_epa_quota
,
745 epa_vcpu_set
=epa_vcpu_set
,
748 resource_allocation
.assert_called_once_with(resource_allocation_param
)
749 self
.assertDictEqual(expected_result
, result
)
751 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
752 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
772 resource_allocation_param
= {
777 resource_allocation
.return_value
= {
783 result
= Ns
._process
_guest
_epa
_quota
_params
(
784 guest_epa_quota
=guest_epa_quota
,
785 epa_vcpu_set
=epa_vcpu_set
,
788 resource_allocation
.assert_called_once_with(resource_allocation_param
)
789 self
.assertDictEqual(expected_result
, result
)
791 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
792 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
812 resource_allocation_param
= {
817 resource_allocation
.return_value
= {
823 result
= Ns
._process
_guest
_epa
_quota
_params
(
824 guest_epa_quota
=guest_epa_quota
,
825 epa_vcpu_set
=epa_vcpu_set
,
828 resource_allocation
.assert_called_once_with(resource_allocation_param
)
829 self
.assertDictEqual(expected_result
, result
)
831 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
832 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
852 resource_allocation_param
= {
857 resource_allocation
.return_value
= {
863 result
= Ns
._process
_guest
_epa
_quota
_params
(
864 guest_epa_quota
=guest_epa_quota
,
865 epa_vcpu_set
=epa_vcpu_set
,
868 resource_allocation
.assert_called_once_with(resource_allocation_param
)
869 self
.assertDictEqual(expected_result
, result
)
871 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
872 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
892 resource_allocation_param
= {
897 resource_allocation
.return_value
= {
903 result
= Ns
._process
_guest
_epa
_quota
_params
(
904 guest_epa_quota
=guest_epa_quota
,
905 epa_vcpu_set
=epa_vcpu_set
,
908 resource_allocation
.assert_called_once_with(resource_allocation_param
)
909 self
.assertDictEqual(expected_result
, result
)
911 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
912 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
932 resource_allocation_param
= {
937 resource_allocation
.return_value
= {
943 result
= Ns
._process
_guest
_epa
_quota
_params
(
944 guest_epa_quota
=guest_epa_quota
,
945 epa_vcpu_set
=epa_vcpu_set
,
948 resource_allocation
.assert_called_once_with(resource_allocation_param
)
949 self
.assertDictEqual(expected_result
, result
)
951 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
952 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
972 resource_allocation_param
= {
977 resource_allocation
.return_value
= {
983 result
= Ns
._process
_guest
_epa
_quota
_params
(
984 guest_epa_quota
=guest_epa_quota
,
985 epa_vcpu_set
=epa_vcpu_set
,
988 resource_allocation
.assert_called_once_with(resource_allocation_param
)
989 self
.assertDictEqual(expected_result
, result
)
991 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
992 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
1037 resource_allocation
.return_value
= {
1043 result
= Ns
._process
_guest
_epa
_quota
_params
(
1044 guest_epa_quota
=guest_epa_quota
,
1045 epa_vcpu_set
=epa_vcpu_set
,
1048 self
.assertTrue(resource_allocation
.called
)
1049 self
.assertDictEqual(expected_result
, result
)
1051 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1052 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1054 resource_allocation
,
1100 epa_vcpu_set
= False
1102 resource_allocation
.return_value
= {
1108 result
= Ns
._process
_guest
_epa
_quota
_params
(
1109 guest_epa_quota
=guest_epa_quota
,
1110 epa_vcpu_set
=epa_vcpu_set
,
1113 self
.assertTrue(resource_allocation
.called
)
1114 self
.assertDictEqual(expected_result
, result
)
1116 def test__process_guest_epa_numa_params_with_empty_numa_params(self
):
1117 expected_numa_result
= []
1118 expected_epa_vcpu_set_result
= False
1119 guest_epa_quota
= {}
1121 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1122 guest_epa_quota
=guest_epa_quota
,
1124 self
.assertEqual(expected_numa_result
, numa_result
)
1125 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1127 def test__process_guest_epa_numa_params_with_wrong_numa_params(self
):
1128 expected_numa_result
= []
1129 expected_epa_vcpu_set_result
= False
1130 guest_epa_quota
= {"no_nume": "here"}
1132 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1133 guest_epa_quota
=guest_epa_quota
,
1136 self
.assertEqual(expected_numa_result
, numa_result
)
1137 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1139 def test__process_guest_epa_numa_params_with_numa_node_policy(self
):
1140 expected_numa_result
= []
1141 expected_epa_vcpu_set_result
= False
1142 guest_epa_quota
= {"numa-node-policy": {}}
1144 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1145 guest_epa_quota
=guest_epa_quota
,
1148 self
.assertEqual(expected_numa_result
, numa_result
)
1149 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1151 def test__process_guest_epa_numa_params_with_no_node(self
):
1152 expected_numa_result
= []
1153 expected_epa_vcpu_set_result
= False
1155 "numa-node-policy": {
1160 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1161 guest_epa_quota
=guest_epa_quota
,
1164 self
.assertEqual(expected_numa_result
, numa_result
)
1165 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1167 def test__process_guest_epa_numa_params_with_1_node_num_cores(self
):
1168 expected_numa_result
= [{"cores": 3}]
1169 expected_epa_vcpu_set_result
= True
1171 "numa-node-policy": {
1180 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1181 guest_epa_quota
=guest_epa_quota
,
1184 self
.assertEqual(expected_numa_result
, numa_result
)
1185 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1187 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self
):
1188 expected_numa_result
= [{"paired_threads": 3}]
1189 expected_epa_vcpu_set_result
= True
1191 "numa-node-policy": {
1194 "paired-threads": {"num-paired-threads": "3"},
1200 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1201 guest_epa_quota
=guest_epa_quota
,
1204 self
.assertEqual(expected_numa_result
, numa_result
)
1205 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1207 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self
):
1208 expected_numa_result
= [
1210 "paired-threads-id": [("0", "1"), ("4", "5")],
1213 expected_epa_vcpu_set_result
= False
1215 "numa-node-policy": {
1219 "paired-thread-ids": [
1235 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1236 guest_epa_quota
=guest_epa_quota
,
1239 self
.assertEqual(expected_numa_result
, numa_result
)
1240 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1242 def test__process_guest_epa_numa_params_with_1_node_num_threads(self
):
1243 expected_numa_result
= [{"threads": 3}]
1244 expected_epa_vcpu_set_result
= True
1246 "numa-node-policy": {
1255 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1256 guest_epa_quota
=guest_epa_quota
,
1259 self
.assertEqual(expected_numa_result
, numa_result
)
1260 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1262 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self
):
1263 expected_numa_result
= [{"memory": 2}]
1264 expected_epa_vcpu_set_result
= False
1266 "numa-node-policy": {
1275 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1276 guest_epa_quota
=guest_epa_quota
,
1279 self
.assertEqual(expected_numa_result
, numa_result
)
1280 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1282 def test__process_guest_epa_numa_params_with_1_node_vcpu(self
):
1283 expected_numa_result
= [
1289 expected_epa_vcpu_set_result
= False
1291 "numa-node-policy": {
1292 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1296 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1297 guest_epa_quota
=guest_epa_quota
,
1300 self
.assertEqual(expected_numa_result
, numa_result
)
1301 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1303 def test__process_guest_epa_numa_params_with_2_node_vcpu(self
):
1304 expected_numa_result
= [
1315 expected_epa_vcpu_set_result
= False
1317 "numa-node-policy": {
1319 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1320 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1325 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1326 guest_epa_quota
=guest_epa_quota
,
1329 self
.assertEqual(expected_numa_result
, numa_result
)
1330 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1332 def test__process_guest_epa_numa_params_with_1_node(self
):
1333 expected_numa_result
= [
1338 "paired_threads": 3,
1339 "paired-threads-id": [("0", "1"), ("4", "5")],
1344 expected_epa_vcpu_set_result
= True
1346 "numa-node-policy": {
1351 "num-paired-threads": "3",
1352 "paired-thread-ids": [
1370 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1371 guest_epa_quota
=guest_epa_quota
,
1374 self
.assertEqual(expected_numa_result
, numa_result
)
1375 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1377 def test__process_guest_epa_numa_params_with_2_nodes(self
):
1378 expected_numa_result
= [
1381 "paired_threads": 3,
1382 "paired-threads-id": [("0", "1"), ("4", "5")],
1388 "paired_threads": 7,
1389 "paired-threads-id": [("2", "3"), ("5", "6")],
1394 expected_epa_vcpu_set_result
= True
1396 "numa-node-policy": {
1401 "num-paired-threads": "3",
1402 "paired-thread-ids": [
1419 "num-paired-threads": "7",
1420 "paired-thread-ids": [
1438 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1439 guest_epa_quota
=guest_epa_quota
,
1442 self
.assertEqual(expected_numa_result
, numa_result
)
1443 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1445 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self
):
1446 expected_numa_result
= {}
1447 expected_epa_vcpu_set_result
= False
1448 guest_epa_quota
= {}
1450 epa_vcpu_set
= False
1452 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1453 guest_epa_quota
=guest_epa_quota
,
1454 vcpu_count
=vcpu_count
,
1455 epa_vcpu_set
=epa_vcpu_set
,
1458 self
.assertDictEqual(expected_numa_result
, numa_result
)
1459 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1461 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self
):
1462 expected_numa_result
= {}
1463 expected_epa_vcpu_set_result
= False
1465 "no-cpu-pinning-policy": "here",
1468 epa_vcpu_set
= False
1470 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1471 guest_epa_quota
=guest_epa_quota
,
1472 vcpu_count
=vcpu_count
,
1473 epa_vcpu_set
=epa_vcpu_set
,
1476 self
.assertDictEqual(expected_numa_result
, numa_result
)
1477 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1479 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self
):
1480 expected_numa_result
= {}
1481 expected_epa_vcpu_set_result
= True
1483 "cpu-pinning-policy": "DEDICATED",
1488 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1489 guest_epa_quota
=guest_epa_quota
,
1490 vcpu_count
=vcpu_count
,
1491 epa_vcpu_set
=epa_vcpu_set
,
1494 self
.assertDictEqual(expected_numa_result
, numa_result
)
1495 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1497 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self
):
1498 expected_numa_result
= {"threads": 3}
1499 expected_epa_vcpu_set_result
= True
1501 "cpu-pinning-policy": "DEDICATED",
1502 "cpu-thread-pinning-policy": "PREFER",
1505 epa_vcpu_set
= False
1507 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1508 guest_epa_quota
=guest_epa_quota
,
1509 vcpu_count
=vcpu_count
,
1510 epa_vcpu_set
=epa_vcpu_set
,
1513 self
.assertDictEqual(expected_numa_result
, numa_result
)
1514 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1516 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self
):
1517 expected_numa_result
= {"cores": 3}
1518 expected_epa_vcpu_set_result
= True
1520 "cpu-pinning-policy": "DEDICATED",
1521 "cpu-thread-pinning-policy": "ISOLATE",
1524 epa_vcpu_set
= False
1526 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1527 guest_epa_quota
=guest_epa_quota
,
1528 vcpu_count
=vcpu_count
,
1529 epa_vcpu_set
=epa_vcpu_set
,
1532 self
.assertDictEqual(expected_numa_result
, numa_result
)
1533 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1535 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self
):
1536 expected_numa_result
= {"threads": 3}
1537 expected_epa_vcpu_set_result
= True
1539 "cpu-pinning-policy": "DEDICATED",
1540 "cpu-thread-pinning-policy": "REQUIRE",
1543 epa_vcpu_set
= False
1545 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1546 guest_epa_quota
=guest_epa_quota
,
1547 vcpu_count
=vcpu_count
,
1548 epa_vcpu_set
=epa_vcpu_set
,
1551 self
.assertDictEqual(expected_numa_result
, numa_result
)
1552 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1554 def test__process_guest_epa_cpu_pinning_params(self
):
1555 expected_numa_result
= {"threads": 3}
1556 expected_epa_vcpu_set_result
= True
1558 "cpu-pinning-policy": "DEDICATED",
1561 epa_vcpu_set
= False
1563 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1564 guest_epa_quota
=guest_epa_quota
,
1565 vcpu_count
=vcpu_count
,
1566 epa_vcpu_set
=epa_vcpu_set
,
1569 self
.assertDictEqual(expected_numa_result
, numa_result
)
1570 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1572 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1573 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1574 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1575 def test__process_guest_epa_params_with_empty_params(
1577 guest_epa_numa_params
,
1578 guest_epa_cpu_pinning_params
,
1579 guest_epa_quota_params
,
1581 expected_result
= {}
1584 result
= Ns
._process
_epa
_params
(
1585 target_flavor
=target_flavor
,
1588 self
.assertDictEqual(expected_result
, result
)
1589 self
.assertFalse(guest_epa_numa_params
.called
)
1590 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1591 self
.assertFalse(guest_epa_quota_params
.called
)
1593 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1594 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1595 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1596 def test__process_guest_epa_params_with_wrong_params(
1598 guest_epa_numa_params
,
1599 guest_epa_cpu_pinning_params
,
1600 guest_epa_quota_params
,
1602 expected_result
= {}
1604 "no-guest-epa": "here",
1607 result
= Ns
._process
_epa
_params
(
1608 target_flavor
=target_flavor
,
1611 self
.assertDictEqual(expected_result
, result
)
1612 self
.assertFalse(guest_epa_numa_params
.called
)
1613 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1614 self
.assertFalse(guest_epa_quota_params
.called
)
1616 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1617 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1618 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1619 def test__process_guest_epa_params(
1621 guest_epa_numa_params
,
1622 guest_epa_cpu_pinning_params
,
1623 guest_epa_quota_params
,
1626 "mem-policy": "STRICT",
1631 "numa-node-policy": {
1632 "mem-policy": "STRICT",
1637 guest_epa_numa_params
.return_value
= ({}, False)
1638 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1639 guest_epa_quota_params
.return_value
= {}
1641 result
= Ns
._process
_epa
_params
(
1642 target_flavor
=target_flavor
,
1645 self
.assertDictEqual(expected_result
, result
)
1646 self
.assertTrue(guest_epa_numa_params
.called
)
1647 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1648 self
.assertTrue(guest_epa_quota_params
.called
)
1650 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1651 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1652 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1653 def test__process_guest_epa_params_with_mempage_size(
1655 guest_epa_numa_params
,
1656 guest_epa_cpu_pinning_params
,
1657 guest_epa_quota_params
,
1660 "mempage-size": "1G",
1661 "mem-policy": "STRICT",
1666 "mempage-size": "1G",
1667 "numa-node-policy": {
1668 "mem-policy": "STRICT",
1673 guest_epa_numa_params
.return_value
= ({}, False)
1674 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1675 guest_epa_quota_params
.return_value
= {}
1677 result
= Ns
._process
_epa
_params
(
1678 target_flavor
=target_flavor
,
1681 self
.assertDictEqual(expected_result
, result
)
1682 self
.assertTrue(guest_epa_numa_params
.called
)
1683 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1684 self
.assertTrue(guest_epa_quota_params
.called
)
1686 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1687 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1688 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1689 def test__process_guest_epa_params_with_numa(
1691 guest_epa_numa_params
,
1692 guest_epa_cpu_pinning_params
,
1693 guest_epa_quota_params
,
1696 "mempage-size": "1G",
1697 "cpu-pinning-policy": "DEDICATED",
1698 "cpu-thread-pinning-policy": "PREFER",
1703 "paired-threads": 3,
1704 "paired-threads-id": [("0", "1"), ("4", "5")],
1708 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1709 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1710 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1711 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1716 "mempage-size": "1G",
1717 "cpu-pinning-policy": "DEDICATED",
1718 "cpu-thread-pinning-policy": "PREFER",
1719 "numa-node-policy": {
1724 "num-paired-threads": "3",
1725 "paired-thread-ids": [
1764 guest_epa_numa_params
.return_value
= (
1768 "paired-threads": 3,
1769 "paired-threads-id": [("0", "1"), ("4", "5")],
1776 guest_epa_cpu_pinning_params
.return_value
= (
1782 guest_epa_quota_params
.return_value
= {
1805 result
= Ns
._process
_epa
_params
(
1806 target_flavor
=target_flavor
,
1808 self
.assertEqual(expected_result
, result
)
1809 self
.assertTrue(guest_epa_numa_params
.called
)
1810 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1811 self
.assertTrue(guest_epa_quota_params
.called
)
1813 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1814 def test__process_flavor_params_with_empty_target_flavor(
1823 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1828 target_record_id
= ""
1830 with self
.assertRaises(KeyError):
1831 Ns
._process
_flavor
_params
(
1832 target_flavor
=target_flavor
,
1835 target_record_id
=target_record_id
,
1838 self
.assertFalse(epa_params
.called
)
1840 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1841 def test__process_flavor_params_with_wrong_target_flavor(
1847 "no-target-flavor": "here",
1851 target_record_id
= ""
1853 with self
.assertRaises(KeyError):
1854 Ns
._process
_flavor
_params
(
1855 target_flavor
=target_flavor
,
1858 target_record_id
=target_record_id
,
1861 self
.assertFalse(epa_params
.called
)
1863 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1864 def test__process_flavor_params_with_empty_indata(
1889 "memory-mb": "1024",
1894 target_record_id
= ""
1896 epa_params
.return_value
= {}
1898 result
= Ns
._process
_flavor
_params
(
1899 target_flavor
=target_flavor
,
1902 target_record_id
=target_record_id
,
1905 self
.assertTrue(epa_params
.called
)
1906 self
.assertDictEqual(result
, expected_result
)
1908 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1909 def test__process_flavor_params_with_wrong_indata(
1934 "memory-mb": "1024",
1941 target_record_id
= ""
1943 epa_params
.return_value
= {}
1945 result
= Ns
._process
_flavor
_params
(
1946 target_flavor
=target_flavor
,
1949 target_record_id
=target_record_id
,
1952 self
.assertTrue(epa_params
.called
)
1953 self
.assertDictEqual(result
, expected_result
)
1955 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1956 def test__process_flavor_params_with_ephemeral_disk(
1964 db
.get_one
.return_value
= {
1965 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1970 {"id": "without_volumes-VM", "min-number-of-instances": 1}
1974 "id": "without_volumes-vnf",
1975 "product-name": "without_volumes-vnf",
1978 "id": "without_volumes-VM",
1979 "name": "without_volumes-VM",
1980 "sw-image-desc": "ubuntu20.04",
1981 "alternative-sw-image-desc": [
1983 "ubuntu20.04-azure",
1985 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
1989 "virtual-storage-desc": [
1990 {"id": "root-volume", "size-of-storage": "10"},
1992 "id": "ephemeral-volume",
1993 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
1994 "size-of-storage": "1",
2000 "path": "/app/storage/",
2028 "memory-mb": "1024",
2036 "ns-flavor-id": "test_id",
2037 "virtual-storages": [
2039 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2040 "size-of-storage": "10",
2045 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2050 target_record_id
= ""
2052 epa_params
.return_value
= {}
2054 result
= Ns
._process
_flavor
_params
(
2055 target_flavor
=target_flavor
,
2058 target_record_id
=target_record_id
,
2062 self
.assertTrue(epa_params
.called
)
2063 self
.assertDictEqual(result
, expected_result
)
2065 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2066 def test__process_flavor_params_with_swap_disk(
2094 "memory-mb": "1024",
2102 "ns-flavor-id": "test_id",
2103 "virtual-storages": [
2105 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2106 "size-of-storage": "20",
2111 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2116 target_record_id
= ""
2118 epa_params
.return_value
= {}
2120 result
= Ns
._process
_flavor
_params
(
2121 target_flavor
=target_flavor
,
2124 target_record_id
=target_record_id
,
2127 self
.assertTrue(epa_params
.called
)
2128 self
.assertDictEqual(result
, expected_result
)
2130 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2131 def test__process_flavor_params_with_persistent_root_disk(
2140 db
.get_one
.return_value
= {
2141 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2146 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2150 "id": "several_volumes-vnf",
2151 "product-name": "several_volumes-vnf",
2154 "id": "several_volumes-VM",
2155 "name": "several_volumes-VM",
2156 "sw-image-desc": "ubuntu20.04",
2157 "alternative-sw-image-desc": [
2159 "ubuntu20.04-azure",
2161 "virtual-storage-desc": [
2162 "persistent-root-volume",
2167 "virtual-storage-desc": [
2169 "id": "persistent-root-volume",
2170 "type-of-storage": "persistent-storage:persistent-storage",
2171 "size-of-storage": "10",
2177 "path": "/app/storage/",
2203 "memory-mb": "1024",
2211 "vdu-name": "several_volumes-VM",
2212 "ns-flavor-id": "test_id",
2213 "virtual-storages": [
2215 "type-of-storage": "persistent-storage:persistent-storage",
2216 "size-of-storage": "10",
2221 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2226 target_record_id
= ""
2228 epa_params
.return_value
= {}
2230 result
= Ns
._process
_flavor
_params
(
2231 target_flavor
=target_flavor
,
2234 target_record_id
=target_record_id
,
2238 self
.assertTrue(epa_params
.called
)
2239 self
.assertDictEqual(result
, expected_result
)
2241 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2242 def test__process_flavor_params_with_epa_params(
2254 "numa": "there-is-numa-here",
2265 "numa": "there-is-numa-here",
2274 "memory-mb": "1024",
2282 "ns-flavor-id": "test_id",
2285 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2290 target_record_id
= ""
2292 epa_params
.return_value
= {
2293 "numa": "there-is-numa-here",
2296 result
= Ns
._process
_flavor
_params
(
2297 target_flavor
=target_flavor
,
2300 target_record_id
=target_record_id
,
2303 self
.assertTrue(epa_params
.called
)
2304 self
.assertDictEqual(result
, expected_result
)
2306 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2307 def test__process_flavor_params(
2316 db
.get_one
.return_value
= {
2317 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2322 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2326 "id": "without_volumes-vnf",
2327 "product-name": "without_volumes-vnf",
2330 "id": "without_volumes-VM",
2331 "name": "without_volumes-VM",
2332 "sw-image-desc": "ubuntu20.04",
2333 "alternative-sw-image-desc": [
2335 "ubuntu20.04-azure",
2337 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2341 "virtual-storage-desc": [
2342 {"id": "root-volume", "size-of-storage": "10"},
2344 "id": "ephemeral-volume",
2345 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2346 "size-of-storage": "1",
2352 "path": "/app/storage/",
2367 "numa": "there-is-numa-here",
2380 "numa": "there-is-numa-here",
2389 "memory-mb": "1024",
2397 "ns-flavor-id": "test_id",
2398 "virtual-storages": [
2400 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2401 "size-of-storage": "10",
2404 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2405 "size-of-storage": "20",
2410 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2415 target_record_id
= ""
2417 epa_params
.return_value
= {
2418 "numa": "there-is-numa-here",
2421 result
= Ns
._process
_flavor
_params
(
2422 target_flavor
=target_flavor
,
2425 target_record_id
=target_record_id
,
2429 self
.assertTrue(epa_params
.called
)
2430 self
.assertDictEqual(result
, expected_result
)
2432 def test__ip_profile_to_ro_with_none(self
):
2435 result
= Ns
._ip
_profile
_to
_ro
(
2436 ip_profile
=ip_profile
,
2439 self
.assertIsNone(result
)
2441 def test__ip_profile_to_ro_with_empty_profile(self
):
2444 result
= Ns
._ip
_profile
_to
_ro
(
2445 ip_profile
=ip_profile
,
2448 self
.assertIsNone(result
)
2450 def test__ip_profile_to_ro_with_wrong_profile(self
):
2452 "no-profile": "here",
2455 "ip_version": "IPv4",
2456 "subnet_address": None,
2457 "gateway_address": None,
2458 "dhcp_enabled": False,
2459 "dhcp_start_address": None,
2463 result
= Ns
._ip
_profile
_to
_ro
(
2464 ip_profile
=ip_profile
,
2467 self
.assertDictEqual(expected_result
, result
)
2469 def test__ip_profile_to_ro_with_ipv4_profile(self
):
2471 "ip-version": "ipv4",
2472 "subnet-address": "192.168.0.0/24",
2473 "gateway-address": "192.168.0.254",
2476 "start-address": "192.168.0.10",
2481 "ip_version": "IPv4",
2482 "subnet_address": "192.168.0.0/24",
2483 "gateway_address": "192.168.0.254",
2484 "dhcp_enabled": True,
2485 "dhcp_start_address": "192.168.0.10",
2489 result
= Ns
._ip
_profile
_to
_ro
(
2490 ip_profile
=ip_profile
,
2493 self
.assertDictEqual(expected_result
, result
)
2495 def test__ip_profile_to_ro_with_ipv6_profile(self
):
2497 "ip-version": "ipv6",
2498 "subnet-address": "2001:0200:0001::/48",
2499 "gateway-address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2502 "start-address": "2001:0200:0001::0010",
2507 "ip_version": "IPv6",
2508 "subnet_address": "2001:0200:0001::/48",
2509 "gateway_address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2510 "dhcp_enabled": True,
2511 "dhcp_start_address": "2001:0200:0001::0010",
2515 result
= Ns
._ip
_profile
_to
_ro
(
2516 ip_profile
=ip_profile
,
2519 self
.assertDictEqual(expected_result
, result
)
2521 def test__ip_profile_to_ro_with_dns_server(self
):
2523 "ip-version": "ipv4",
2524 "subnet-address": "192.168.0.0/24",
2525 "gateway-address": "192.168.0.254",
2528 "start-address": "192.168.0.10",
2533 "address": "8.8.8.8",
2536 "address": "1.1.1.1",
2539 "address": "1.0.0.1",
2544 "ip_version": "IPv4",
2545 "subnet_address": "192.168.0.0/24",
2546 "gateway_address": "192.168.0.254",
2547 "dhcp_enabled": True,
2548 "dhcp_start_address": "192.168.0.10",
2550 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2553 result
= Ns
._ip
_profile
_to
_ro
(
2554 ip_profile
=ip_profile
,
2557 self
.assertDictEqual(expected_result
, result
)
2559 def test__ip_profile_to_ro_with_security_group(self
):
2561 "ip-version": "ipv4",
2562 "subnet-address": "192.168.0.0/24",
2563 "gateway-address": "192.168.0.254",
2566 "start-address": "192.168.0.10",
2570 "some-security-group": "here",
2574 "ip_version": "IPv4",
2575 "subnet_address": "192.168.0.0/24",
2576 "gateway_address": "192.168.0.254",
2577 "dhcp_enabled": True,
2578 "dhcp_start_address": "192.168.0.10",
2581 "some-security-group": "here",
2585 result
= Ns
._ip
_profile
_to
_ro
(
2586 ip_profile
=ip_profile
,
2589 self
.assertDictEqual(expected_result
, result
)
2591 def test__ip_profile_to_ro(self
):
2593 "ip-version": "ipv4",
2594 "subnet-address": "192.168.0.0/24",
2595 "gateway-address": "192.168.0.254",
2598 "start-address": "192.168.0.10",
2603 "address": "8.8.8.8",
2606 "address": "1.1.1.1",
2609 "address": "1.0.0.1",
2613 "some-security-group": "here",
2617 "ip_version": "IPv4",
2618 "subnet_address": "192.168.0.0/24",
2619 "gateway_address": "192.168.0.254",
2620 "dhcp_enabled": True,
2621 "dhcp_start_address": "192.168.0.10",
2623 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2625 "some-security-group": "here",
2629 result
= Ns
._ip
_profile
_to
_ro
(
2630 ip_profile
=ip_profile
,
2633 self
.assertDictEqual(expected_result
, result
)
2635 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2636 def test__process_net_params_with_empty_params(
2647 "provider_network": "some-profile-here",
2649 target_record_id
= ""
2652 "net_name": "ns-name-vld-name",
2653 "net_type": "bridge",
2655 "some_ip_profile": "here",
2657 "provider_network_profile": "some-profile-here",
2661 ip_profile_to_ro
.return_value
= {
2662 "some_ip_profile": "here",
2665 result
= Ns
._process
_net
_params
(
2666 target_vld
=target_vld
,
2669 target_record_id
=target_record_id
,
2672 self
.assertDictEqual(expected_result
, result
)
2673 self
.assertTrue(ip_profile_to_ro
.called
)
2675 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2676 def test__process_net_params_with_vim_info_sdn(
2688 "sdn-ports": ["some", "ports", "here"],
2689 "vlds": ["some", "vlds", "here"],
2692 target_record_id
= "vld.sdn.something"
2695 "sdn-ports": ["some", "ports", "here"],
2696 "vlds": ["some", "vlds", "here"],
2701 result
= Ns
._process
_net
_params
(
2702 target_vld
=target_vld
,
2705 target_record_id
=target_record_id
,
2708 self
.assertDictEqual(expected_result
, result
)
2709 self
.assertFalse(ip_profile_to_ro
.called
)
2711 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2712 def test__process_net_params_with_vim_info_sdn_target_vim(
2724 "sdn-ports": ["some", "ports", "here"],
2725 "vlds": ["some", "vlds", "here"],
2726 "target_vim": "some-vim",
2729 target_record_id
= "vld.sdn.something"
2731 "depends_on": ["some-vim vld.sdn"],
2733 "sdn-ports": ["some", "ports", "here"],
2734 "vlds": ["some", "vlds", "here"],
2735 "target_vim": "some-vim",
2740 result
= Ns
._process
_net
_params
(
2741 target_vld
=target_vld
,
2744 target_record_id
=target_record_id
,
2747 self
.assertDictEqual(expected_result
, result
)
2748 self
.assertFalse(ip_profile_to_ro
.called
)
2750 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2751 def test__process_net_params_with_vim_network_name(
2762 "vim_network_name": "some-network-name",
2764 target_record_id
= "vld.sdn.something"
2768 "name": "some-network-name",
2773 result
= Ns
._process
_net
_params
(
2774 target_vld
=target_vld
,
2777 target_record_id
=target_record_id
,
2780 self
.assertDictEqual(expected_result
, result
)
2781 self
.assertFalse(ip_profile_to_ro
.called
)
2783 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2784 def test__process_net_params_with_vim_network_id(
2795 "vim_network_id": "some-network-id",
2797 target_record_id
= "vld.sdn.something"
2801 "id": "some-network-id",
2806 result
= Ns
._process
_net
_params
(
2807 target_vld
=target_vld
,
2810 target_record_id
=target_record_id
,
2813 self
.assertDictEqual(expected_result
, result
)
2814 self
.assertFalse(ip_profile_to_ro
.called
)
2816 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2817 def test__process_net_params_with_mgmt_network(
2824 "mgmt-network": "some-mgmt-network",
2830 target_record_id
= "vld.sdn.something"
2838 result
= Ns
._process
_net
_params
(
2839 target_vld
=target_vld
,
2842 target_record_id
=target_record_id
,
2845 self
.assertDictEqual(expected_result
, result
)
2846 self
.assertFalse(ip_profile_to_ro
.called
)
2848 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2849 def test__process_net_params_with_underlay_eline(
2855 "underlay": "some-underlay-here",
2862 "provider_network": "some-profile-here",
2864 target_record_id
= ""
2868 "some_ip_profile": "here",
2870 "net_name": "ns-name-vld-name",
2872 "provider_network_profile": "some-profile-here",
2876 ip_profile_to_ro
.return_value
= {
2877 "some_ip_profile": "here",
2880 result
= Ns
._process
_net
_params
(
2881 target_vld
=target_vld
,
2884 target_record_id
=target_record_id
,
2887 self
.assertDictEqual(expected_result
, result
)
2888 self
.assertTrue(ip_profile_to_ro
.called
)
2890 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2891 def test__process_net_params_with_underlay_elan(
2897 "underlay": "some-underlay-here",
2904 "provider_network": "some-profile-here",
2906 target_record_id
= ""
2910 "some_ip_profile": "here",
2912 "net_name": "ns-name-vld-name",
2914 "provider_network_profile": "some-profile-here",
2918 ip_profile_to_ro
.return_value
= {
2919 "some_ip_profile": "here",
2922 result
= Ns
._process
_net
_params
(
2923 target_vld
=target_vld
,
2926 target_record_id
=target_record_id
,
2929 self
.assertDictEqual(expected_result
, result
)
2930 self
.assertTrue(ip_profile_to_ro
.called
)
2932 def test__get_cloud_init_exception(self
):
2933 db_mock
= MagicMock(name
="database mock")
2938 with self
.assertRaises(NsException
):
2939 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2941 def test__get_cloud_init_file_fs_exception(self
):
2942 db_mock
= MagicMock(name
="database mock")
2945 location
= "vnfr_id_123456:file:test_file"
2946 db_mock
.get_one
.return_value
= {
2949 "folder": "/home/osm",
2950 "pkg-dir": "vnfr_test_dir",
2955 with self
.assertRaises(NsException
):
2956 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2958 def test__get_cloud_init_file(self
):
2959 db_mock
= MagicMock(name
="database mock")
2960 fs_mock
= MagicMock(name
="filesystem mock")
2961 file_mock
= MagicMock(name
="file mock")
2963 location
= "vnfr_id_123456:file:test_file"
2964 cloud_init_content
= "this is a cloud init file content"
2966 db_mock
.get_one
.return_value
= {
2969 "folder": "/home/osm",
2970 "pkg-dir": "vnfr_test_dir",
2974 fs_mock
.file_open
.return_value
= file_mock
2975 file_mock
.__enter
__.return_value
.read
.return_value
= cloud_init_content
2977 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2979 self
.assertEqual(cloud_init_content
, result
)
2981 def test__get_cloud_init_vdu(self
):
2982 db_mock
= MagicMock(name
="database mock")
2985 location
= "vnfr_id_123456:vdu:0"
2986 cloud_init_content
= "this is a cloud init file content"
2988 db_mock
.get_one
.return_value
= {
2991 "cloud-init": cloud_init_content
,
2996 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2998 self
.assertEqual(cloud_init_content
, result
)
3000 @patch("jinja2.Environment.__init__")
3001 def test__parse_jinja2_undefined_error(self
, env_mock
: Mock
):
3002 cloud_init_content
= None
3006 env_mock
.side_effect
= UndefinedError("UndefinedError occurred.")
3008 with self
.assertRaises(NsException
):
3010 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3013 @patch("jinja2.Environment.__init__")
3014 def test__parse_jinja2_template_error(self
, env_mock
: Mock
):
3015 cloud_init_content
= None
3019 env_mock
.side_effect
= TemplateError("TemplateError occurred.")
3021 with self
.assertRaises(NsException
):
3023 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3026 @patch("jinja2.Environment.__init__")
3027 def test__parse_jinja2_template_not_found(self
, env_mock
: Mock
):
3028 cloud_init_content
= None
3032 env_mock
.side_effect
= TemplateNotFound("TemplateNotFound occurred.")
3034 with self
.assertRaises(NsException
):
3036 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3039 def test_rendering_jinja2_temp_without_special_characters(self
):
3040 cloud_init_content
= """
3043 table_type: {{type}}
3045 overwrite: {{is_override}}
3048 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3052 "is_override": "False",
3053 "command": "; mkdir abc",
3055 context
= "cloud-init for VM"
3056 expected_result
= """
3064 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
3066 result
= Ns
._parse
_jinja
2(
3067 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3069 self
.assertEqual(result
, expected_result
)
3071 def test_rendering_jinja2_temp_with_special_characters(self
):
3072 cloud_init_content
= """
3075 table_type: {{type}}
3077 overwrite: {{is_override}}
3080 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3084 "is_override": "False",
3085 "command": "& rm -rf",
3087 context
= "cloud-init for VM"
3088 expected_result
= """
3096 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3098 result
= Ns
._parse
_jinja
2(
3099 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
3101 self
.assertNotEqual(result
, expected_result
)
3103 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self
):
3104 with
patch("osm_ng_ro.ns.Environment") as mock_environment
:
3105 mock_environment
.return_value
= Environment(
3106 undefined
=StrictUndefined
,
3107 autoescape
=select_autoescape(default_for_string
=False, default
=False),
3109 cloud_init_content
= """
3112 table_type: {{type}}
3114 overwrite: {{is_override}}
3117 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3121 "is_override": "False",
3122 "command": "& rm -rf /",
3124 context
= "cloud-init for VM"
3125 expected_result
= """
3133 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3135 result
= Ns
._parse
_jinja
2(
3136 cloud_init_content
=cloud_init_content
,
3140 self
.assertEqual(result
, expected_result
)
3142 @patch("osm_ng_ro.ns.Ns._assign_vim")
3143 def test__rebuild_start_stop_task(self
, assign_vim
):
3146 actions
= ["start", "stop", "rebuild"]
3147 vdu_id
= "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3148 vnf_id
= "665b4165-ce24-4320-bf19-b9a45bade49f"
3150 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3151 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3153 target_vim
= "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3154 t
= "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3155 for action
in actions
:
3157 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3158 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3159 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3160 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
3161 "status": "SCHEDULED",
3164 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0",
3165 "target_record_id": t
,
3167 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3171 extra_dict
["params"] = {
3172 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3175 task
= self
.ns
.rebuild_start_stop_task(
3185 self
.assertEqual(task
.get("action_id"), action_id
)
3186 self
.assertEqual(task
.get("nsr_id"), nsr_id
)
3187 self
.assertEqual(task
.get("target_id"), target_vim
)
3188 self
.assertDictEqual(task
, expected_result
)
3190 @patch("osm_ng_ro.ns.Ns._assign_vim")
3191 def test_verticalscale_task(self
, assign_vim
):
3195 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3196 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3198 target_record_id
= (
3199 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3200 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3204 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3205 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3206 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3207 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3208 "status": "SCHEDULED",
3210 "item": "verticalscale",
3211 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3212 "target_record_id": target_record_id
,
3214 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3215 "flavor_dict": "flavor_dict",
3219 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3221 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3224 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3225 extra_dict
["params"] = {
3226 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3227 "flavor_dict": "flavor_dict",
3229 task
= self
.ns
.verticalscale_task(
3230 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3233 self
.assertDictEqual(task
, expected_result
)
3235 @patch("osm_ng_ro.ns.Ns._assign_vim")
3236 def test_migrate_task(self
, assign_vim
):
3240 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3241 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3243 target_record_id
= (
3244 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3245 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3249 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3250 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3251 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3252 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3253 "status": "SCHEDULED",
3256 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3257 "target_record_id": target_record_id
,
3259 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3260 "migrate_host": "migrateToHost",
3264 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3266 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3269 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3270 extra_dict
["params"] = {
3271 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3272 "migrate_host": "migrateToHost",
3274 task
= self
.ns
.migrate_task(
3275 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3278 self
.assertDictEqual(task
, expected_result
)
3281 class TestProcessVduParams(unittest
.TestCase
):
3284 self
.logger
= CopyingMock(autospec
=True)
3286 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3287 def test_find_persistent_root_volumes_empty_instantiation_vol_list(
3288 self
, mock_volume_keeping_required
3290 """Find persistent root volume, instantiation_vol_list is empty."""
3291 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3292 target_vdu
= target_vdu_wth_persistent_storage
3293 vdu_instantiation_volumes_list
= []
3295 mock_volume_keeping_required
.return_value
= True
3296 expected_root_disk
= {
3297 "id": "persistent-root-volume",
3298 "type-of-storage": "persistent-storage:persistent-storage",
3299 "size-of-storage": "10",
3300 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}],
3302 expected_persist_root_disk
= {
3303 "persistent-root-volume": {
3304 "image_id": "ubuntu20.04",
3309 expected_disk_list
= [
3311 "image_id": "ubuntu20.04",
3316 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3317 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3319 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3320 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3321 self
.assertEqual(disk_list
, expected_disk_list
)
3322 self
.assertEqual(len(disk_list
), 1)
3324 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3325 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(
3326 self
, mock_volume_keeping_required
3328 """Find persistent root volume, always selects the first vsd as root volume."""
3329 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3330 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3331 "persistent-volume2",
3332 "persistent-root-volume",
3335 target_vdu
= target_vdu_wth_persistent_storage
3336 vdu_instantiation_volumes_list
= []
3338 mock_volume_keeping_required
.return_value
= True
3339 expected_root_disk
= {
3340 "id": "persistent-volume2",
3341 "type-of-storage": "persistent-storage:persistent-storage",
3342 "size-of-storage": "10",
3344 expected_persist_root_disk
= {
3345 "persistent-volume2": {
3346 "image_id": "ubuntu20.04",
3351 expected_disk_list
= [
3353 "image_id": "ubuntu20.04",
3358 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3359 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3361 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3362 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3363 self
.assertEqual(disk_list
, expected_disk_list
)
3364 self
.assertEqual(len(disk_list
), 1)
3366 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3367 def test_find_persistent_root_volumes_empty_size_of_storage(
3368 self
, mock_volume_keeping_required
3370 """Find persistent root volume, size of storage is empty."""
3371 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3372 vnfd
["virtual-storage-desc"][0]["size-of-storage"] = ""
3373 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3374 "persistent-volume2",
3375 "persistent-root-volume",
3378 target_vdu
= target_vdu_wth_persistent_storage
3379 vdu_instantiation_volumes_list
= []
3381 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3382 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3384 self
.assertEqual(persist_root_disk
, None)
3385 mock_volume_keeping_required
.assert_not_called()
3386 self
.assertEqual(disk_list
, [])
3388 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3389 def test_find_persistent_root_volumes_keeping_is_not_required(
3390 self
, mock_volume_keeping_required
3392 """Find persistent root volume, volume keeping is not required."""
3393 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3394 vnfd
["virtual-storage-desc"][1]["vdu-storage-requirements"] = [
3395 {"key": "keep-volume", "value": "false"},
3397 target_vdu
= target_vdu_wth_persistent_storage
3398 vdu_instantiation_volumes_list
= []
3400 mock_volume_keeping_required
.return_value
= False
3401 expected_root_disk
= {
3402 "id": "persistent-root-volume",
3403 "type-of-storage": "persistent-storage:persistent-storage",
3404 "size-of-storage": "10",
3405 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}],
3407 expected_persist_root_disk
= {
3408 "persistent-root-volume": {
3409 "image_id": "ubuntu20.04",
3414 expected_disk_list
= [
3416 "image_id": "ubuntu20.04",
3421 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3422 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3424 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3425 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3426 self
.assertEqual(disk_list
, expected_disk_list
)
3427 self
.assertEqual(len(disk_list
), 1)
3429 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3430 def test_find_persistent_root_volumes_target_vdu_mismatch(
3431 self
, mock_volume_keeping_required
3433 """Find persistent root volume, target vdu name is not matching."""
3434 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3435 vnfd
["vdu"][0]["name"] = "Several_Volumes-VM"
3436 target_vdu
= target_vdu_wth_persistent_storage
3437 vdu_instantiation_volumes_list
= []
3439 result
= self
.ns
.find_persistent_root_volumes(
3440 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3442 self
.assertEqual(result
, None)
3443 mock_volume_keeping_required
.assert_not_called()
3444 self
.assertEqual(disk_list
, [])
3445 self
.assertEqual(len(disk_list
), 0)
3447 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3448 def test_find_persistent_root_volumes_with_instantiation_vol_list(
3449 self
, mock_volume_keeping_required
3451 """Find persistent root volume, existing volume needs to be used."""
3452 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3453 target_vdu
= target_vdu_wth_persistent_storage
3454 vdu_instantiation_volumes_list
= [
3456 "vim-volume-id": vim_volume_id
,
3457 "name": "persistent-root-volume",
3461 expected_persist_root_disk
= {
3462 "persistent-root-volume": {
3463 "vim_volume_id": vim_volume_id
,
3464 "image_id": "ubuntu20.04",
3467 expected_disk_list
= [
3469 "vim_volume_id": vim_volume_id
,
3470 "image_id": "ubuntu20.04",
3473 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3474 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3476 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3477 mock_volume_keeping_required
.assert_not_called()
3478 self
.assertEqual(disk_list
, expected_disk_list
)
3479 self
.assertEqual(len(disk_list
), 1)
3481 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3482 def test_find_persistent_root_volumes_invalid_instantiation_params(
3483 self
, mock_volume_keeping_required
3485 """Find persistent root volume, existing volume id keyword is invalid."""
3486 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3487 target_vdu
= target_vdu_wth_persistent_storage
3488 vdu_instantiation_volumes_list
= [
3490 "volume-id": vim_volume_id
,
3491 "name": "persistent-root-volume",
3495 with self
.assertRaises(KeyError):
3496 self
.ns
.find_persistent_root_volumes(
3497 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3499 mock_volume_keeping_required
.assert_not_called()
3500 self
.assertEqual(disk_list
, [])
3501 self
.assertEqual(len(disk_list
), 0)
3503 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3504 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3505 self
, mock_volume_keeping_required
3507 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3508 persistent_root_disk
= {
3509 "persistent-root-volume": {
3510 "image_id": "ubuntu20.04",
3515 mock_volume_keeping_required
.return_value
= False
3516 target_vdu
= target_vdu_wth_persistent_storage
3517 vdu_instantiation_volumes_list
= []
3520 "image_id": "ubuntu20.04",
3526 "id": "persistent-volume2",
3527 "size-of-storage": "10",
3528 "type-of-storage": "persistent-storage:persistent-storage",
3530 expected_disk_list
= [
3532 "image_id": "ubuntu20.04",
3541 self
.ns
.find_persistent_volumes(
3542 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3544 self
.assertEqual(disk_list
, expected_disk_list
)
3545 mock_volume_keeping_required
.assert_called_once_with(expected_disk
)
3547 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3548 def test_find_persistent_volumes_vdu_wth_inst_vol_list(
3549 self
, mock_volume_keeping_required
3551 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3552 persistent_root_disk
= {
3553 "persistent-root-volume": {
3554 "image_id": "ubuntu20.04",
3559 vdu_instantiation_volumes_list
= [
3561 "vim-volume-id": vim_volume_id
,
3562 "name": "persistent-volume2",
3565 target_vdu
= target_vdu_wth_persistent_storage
3568 "image_id": "ubuntu20.04",
3573 expected_disk_list
= [
3575 "image_id": "ubuntu20.04",
3580 "vim_volume_id": vim_volume_id
,
3583 self
.ns
.find_persistent_volumes(
3584 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3586 self
.assertEqual(disk_list
, expected_disk_list
)
3587 mock_volume_keeping_required
.assert_not_called()
3589 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3590 def test_find_persistent_volumes_vdu_wthout_persistent_storage(
3591 self
, mock_volume_keeping_required
3593 """Find persistent ordinary volume, there is not any persistent disk."""
3594 persistent_root_disk
= {}
3595 vdu_instantiation_volumes_list
= []
3596 mock_volume_keeping_required
.return_value
= False
3597 target_vdu
= target_vdu_wthout_persistent_storage
3599 self
.ns
.find_persistent_volumes(
3600 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3602 self
.assertEqual(disk_list
, disk_list
)
3603 mock_volume_keeping_required
.assert_not_called()
3605 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3606 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3607 self
, mock_volume_keeping_required
3609 """There is persistent root disk, but there is not ordinary persistent disk."""
3610 persistent_root_disk
= {
3611 "persistent-root-volume": {
3612 "image_id": "ubuntu20.04",
3617 vdu_instantiation_volumes_list
= []
3618 mock_volume_keeping_required
.return_value
= False
3619 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3620 target_vdu
["virtual-storages"] = [
3622 "id": "persistent-root-volume",
3623 "size-of-storage": "10",
3624 "type-of-storage": "persistent-storage:persistent-storage",
3625 "vdu-storage-requirements": [
3626 {"key": "keep-volume", "value": "true"},
3630 "id": "ephemeral-volume",
3631 "size-of-storage": "1",
3632 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3637 "image_id": "ubuntu20.04",
3642 self
.ns
.find_persistent_volumes(
3643 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3645 self
.assertEqual(disk_list
, disk_list
)
3646 mock_volume_keeping_required
.assert_not_called()
3648 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3649 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(
3650 self
, mock_volume_keeping_required
3652 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3653 vim-volume-id is given as instantiation parameter but disk id is not matching."""
3654 mock_volume_keeping_required
.return_value
= True
3655 vdu_instantiation_volumes_list
= [
3657 "vim-volume-id": vim_volume_id
,
3658 "name": "persistent-volume3",
3661 persistent_root_disk
= {
3662 "persistent-root-volume": {
3663 "image_id": "ubuntu20.04",
3670 "image_id": "ubuntu20.04",
3675 expected_disk_list
= [
3677 "image_id": "ubuntu20.04",
3687 "id": "persistent-volume2",
3688 "size-of-storage": "10",
3689 "type-of-storage": "persistent-storage:persistent-storage",
3691 target_vdu
= target_vdu_wth_persistent_storage
3692 self
.ns
.find_persistent_volumes(
3693 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3695 self
.assertEqual(disk_list
, expected_disk_list
)
3696 mock_volume_keeping_required
.assert_called_once_with(expected_disk
)
3698 def test_is_volume_keeping_required_true(self
):
3699 """Volume keeping is required."""
3700 virtual_storage_descriptor
= {
3701 "id": "persistent-root-volume",
3702 "type-of-storage": "persistent-storage:persistent-storage",
3703 "size-of-storage": "10",
3704 "vdu-storage-requirements": [
3705 {"key": "keep-volume", "value": "true"},
3708 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3709 self
.assertEqual(result
, True)
3711 def test_is_volume_keeping_required_false(self
):
3712 """Volume keeping is not required."""
3713 virtual_storage_descriptor
= {
3714 "id": "persistent-root-volume",
3715 "type-of-storage": "persistent-storage:persistent-storage",
3716 "size-of-storage": "10",
3717 "vdu-storage-requirements": [
3718 {"key": "keep-volume", "value": "false"},
3721 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3722 self
.assertEqual(result
, False)
3724 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self
):
3725 """Volume keeping is not required, vdu-storage-requirements key does not exist."""
3726 virtual_storage_descriptor
= {
3727 "id": "persistent-root-volume",
3728 "type-of-storage": "persistent-storage:persistent-storage",
3729 "size-of-storage": "10",
3731 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3732 self
.assertEqual(result
, False)
3734 def test_is_volume_keeping_required_wrong_keyword(self
):
3735 """vdu-storage-requirements key to indicate keeping-volume is wrong."""
3736 virtual_storage_descriptor
= {
3737 "id": "persistent-root-volume",
3738 "type-of-storage": "persistent-storage:persistent-storage",
3739 "size-of-storage": "10",
3740 "vdu-storage-requirements": [
3741 {"key": "hold-volume", "value": "true"},
3744 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3745 self
.assertEqual(result
, False)
3747 def test_sort_vdu_interfaces_position_all_wth_positions(self
):
3748 """Interfaces are sorted according to position, all have positions."""
3749 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3750 target_vdu
["interfaces"] = [
3753 "ns-vld-id": "datanet",
3758 "ns-vld-id": "mgmtnet",
3762 sorted_interfaces
= [
3765 "ns-vld-id": "mgmtnet",
3770 "ns-vld-id": "datanet",
3774 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3775 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3777 def test_sort_vdu_interfaces_position_some_wth_position(self
):
3778 """Interfaces are sorted according to position, some of them have positions."""
3779 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3780 target_vdu
["interfaces"] = [
3783 "ns-vld-id": "mgmtnet",
3787 "ns-vld-id": "datanet",
3791 sorted_interfaces
= [
3794 "ns-vld-id": "datanet",
3799 "ns-vld-id": "mgmtnet",
3802 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3803 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3805 def test_sort_vdu_interfaces_position_empty_interface_list(self
):
3806 """Interface list is empty."""
3807 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3808 target_vdu
["interfaces"] = []
3809 sorted_interfaces
= []
3810 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3811 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3813 def test_partially_locate_vdu_interfaces(self
):
3814 """Some interfaces have positions."""
3815 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3816 target_vdu
["interfaces"] = [
3819 "ns-vld-id": "net1",
3821 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3824 "ns-vld-id": "mgmtnet",
3828 "ns-vld-id": "datanet",
3832 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3833 self
.assertDictEqual(
3834 target_vdu
["interfaces"][0],
3837 "ns-vld-id": "datanet",
3841 self
.assertDictEqual(
3842 target_vdu
["interfaces"][2],
3843 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3846 def test_partially_locate_vdu_interfaces_position_start_from_0(self
):
3847 """Some interfaces have positions, position start from 0."""
3848 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3849 target_vdu
["interfaces"] = [
3852 "ns-vld-id": "net1",
3854 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3857 "ns-vld-id": "mgmtnet",
3861 "ns-vld-id": "datanet",
3865 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3866 self
.assertDictEqual(
3867 target_vdu
["interfaces"][0],
3870 "ns-vld-id": "datanet",
3874 self
.assertDictEqual(
3875 target_vdu
["interfaces"][3],
3876 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3879 def test_partially_locate_vdu_interfaces_wthout_position(self
):
3880 """Interfaces do not have positions."""
3881 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3882 target_vdu
["interfaces"] = interfaces_wthout_positions
3883 expected_result
= deepcopy(target_vdu
["interfaces"])
3884 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3885 self
.assertEqual(target_vdu
["interfaces"], expected_result
)
3887 def test_partially_locate_vdu_interfaces_all_has_position(self
):
3888 """All interfaces have position."""
3889 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3890 target_vdu
["interfaces"] = interfaces_wth_all_positions
3891 expected_interfaces
= [
3894 "ns-vld-id": "net2",
3899 "ns-vld-id": "mgmtnet",
3904 "ns-vld-id": "net1",
3908 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3909 self
.assertEqual(target_vdu
["interfaces"], expected_interfaces
)
3911 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3912 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3913 def test_prepare_vdu_cloud_init(self
, mock_parse_jinja2
, mock_get_cloud_init
):
3914 """Target_vdu has cloud-init and boot-data-drive."""
3915 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3916 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3917 target_vdu
["boot-data-drive"] = "vda"
3919 mock_get_cloud_init
.return_value
= cloud_init_content
3920 mock_parse_jinja2
.return_value
= user_data
3922 "user-data": user_data
,
3923 "boot-data-drive": "vda",
3925 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3926 self
.assertDictEqual(result
, expected_result
)
3927 mock_get_cloud_init
.assert_called_once_with(
3928 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3930 mock_parse_jinja2
.assert_called_once_with(
3931 cloud_init_content
=cloud_init_content
,
3933 context
="sample-cloud-init-path",
3936 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3937 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3938 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3939 self
, mock_parse_jinja2
, mock_get_cloud_init
3941 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3942 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3943 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3944 target_vdu
["boot-data-drive"] = "vda"
3946 mock_get_cloud_init
.side_effect
= NsException(
3947 "Mismatch descriptor for cloud init."
3950 with self
.assertRaises(NsException
) as err
:
3951 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3952 self
.assertEqual(str(err
.exception
), "Mismatch descriptor for cloud init.")
3954 mock_get_cloud_init
.assert_called_once_with(
3955 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3957 mock_parse_jinja2
.assert_not_called()
3959 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3960 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3961 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3962 self
, mock_parse_jinja2
, mock_get_cloud_init
3964 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3965 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3966 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3967 target_vdu
["boot-data-drive"] = "vda"
3969 mock_get_cloud_init
.return_value
= cloud_init_content
3970 mock_parse_jinja2
.side_effect
= NsException("Error parsing cloud-init content.")
3972 with self
.assertRaises(NsException
) as err
:
3973 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3974 self
.assertEqual(str(err
.exception
), "Error parsing cloud-init content.")
3975 mock_get_cloud_init
.assert_called_once_with(
3976 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3978 mock_parse_jinja2
.assert_called_once_with(
3979 cloud_init_content
=cloud_init_content
,
3981 context
="sample-cloud-init-path",
3984 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3985 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3986 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
3987 self
, mock_parse_jinja2
, mock_get_cloud_init
3989 """Target_vdu has cloud-init but do not have boot-data-drive."""
3990 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3991 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3993 mock_get_cloud_init
.return_value
= cloud_init_content
3994 mock_parse_jinja2
.return_value
= user_data
3996 "user-data": user_data
,
3998 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3999 self
.assertDictEqual(result
, expected_result
)
4000 mock_get_cloud_init
.assert_called_once_with(
4001 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
4003 mock_parse_jinja2
.assert_called_once_with(
4004 cloud_init_content
=cloud_init_content
,
4006 context
="sample-cloud-init-path",
4009 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
4010 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
4011 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
4012 self
, mock_parse_jinja2
, mock_get_cloud_init
4014 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
4015 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4016 target_vdu
["cloud-init"] = "sample-cloud-init-path"
4017 target_vdu
["boot-data-drive"] = "vda"
4018 vdu2cloud_init
= {"sample-cloud-init-path": cloud_init_content
}
4019 mock_parse_jinja2
.return_value
= user_data
4021 "user-data": user_data
,
4022 "boot-data-drive": "vda",
4024 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
4025 self
.assertDictEqual(result
, expected_result
)
4026 mock_get_cloud_init
.assert_not_called()
4027 mock_parse_jinja2
.assert_called_once_with(
4028 cloud_init_content
=cloud_init_content
,
4030 context
="sample-cloud-init-path",
4033 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
4034 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
4035 def test_prepare_vdu_cloud_init_no_cloud_init(
4036 self
, mock_parse_jinja2
, mock_get_cloud_init
4038 """Target_vdu do not have cloud-init."""
4039 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4040 target_vdu
["boot-data-drive"] = "vda"
4043 "boot-data-drive": "vda",
4045 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
4046 self
.assertDictEqual(result
, expected_result
)
4047 mock_get_cloud_init
.assert_not_called()
4048 mock_parse_jinja2
.assert_not_called()
4050 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self
):
4051 """ns_vld and vnf_vld both exist."""
4054 "ns-vld-id": "mgmtnet",
4055 "vnf-vld-id": "mgmt_cp_int",
4057 expected_result
= f
"{ns_preffix}:vld.mgmtnet"
4058 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4059 interface
, ns_preffix
, vnf_preffix
4061 self
.assertEqual(result
, expected_result
)
4063 def test_check_vld_information_of_interfaces_empty_interfaces(self
):
4064 """Interface dict is empty."""
4066 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4067 interface
, ns_preffix
, vnf_preffix
4069 self
.assertEqual(result
, "")
4071 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self
):
4072 """Interface dict has only vnf_vld."""
4075 "vnf-vld-id": "mgmt_cp_int",
4077 expected_result
= f
"{vnf_preffix}:vld.mgmt_cp_int"
4078 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
4079 interface
, ns_preffix
, vnf_preffix
4081 self
.assertEqual(result
, expected_result
)
4083 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
4086 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
4089 "vnf-vld-id": "mgmt_cp_int",
4092 with self
.assertRaises(Exception) as err
:
4093 self
.ns
._check
_vld
_information
_of
_interfaces
(
4094 interface
, ns_preffix
, vnf_preffix
4096 self
.assertEqual(type(err
), TypeError)
4098 def test_prepare_interface_port_security_has_security_details(self
):
4099 """Interface dict has port security details."""
4102 "ns-vld-id": "mgmtnet",
4103 "vnf-vld-id": "mgmt_cp_int",
4104 "port-security-enabled": True,
4105 "port-security-disable-strategy": "allow-address-pairs",
4107 expected_interface
= {
4109 "ns-vld-id": "mgmtnet",
4110 "vnf-vld-id": "mgmt_cp_int",
4111 "port_security": True,
4112 "port_security_disable_strategy": "allow-address-pairs",
4114 self
.ns
._prepare
_interface
_port
_security
(interface
)
4115 self
.assertDictEqual(interface
, expected_interface
)
4117 def test_prepare_interface_port_security_empty_interfaces(self
):
4118 """Interface dict is empty."""
4120 expected_interface
= {}
4121 self
.ns
._prepare
_interface
_port
_security
(interface
)
4122 self
.assertDictEqual(interface
, expected_interface
)
4124 def test_prepare_interface_port_security_wthout_port_security(self
):
4125 """Interface dict does not have port security details."""
4128 "ns-vld-id": "mgmtnet",
4129 "vnf-vld-id": "mgmt_cp_int",
4131 expected_interface
= {
4133 "ns-vld-id": "mgmtnet",
4134 "vnf-vld-id": "mgmt_cp_int",
4136 self
.ns
._prepare
_interface
_port
_security
(interface
)
4137 self
.assertDictEqual(interface
, expected_interface
)
4139 def test_create_net_item_of_interface_floating_ip_port_security(self
):
4140 """Interface dict has floating ip, port-security details."""
4143 "vcpi": "sample_vcpi",
4144 "port_security": True,
4145 "port_security_disable_strategy": "allow-address-pairs",
4146 "floating_ip": "10.1.1.12",
4147 "ns-vld-id": "mgmtnet",
4148 "vnf-vld-id": "mgmt_cp_int",
4150 net_text
= f
"{ns_preffix}"
4151 expected_net_item
= {
4153 "port_security": True,
4154 "port_security_disable_strategy": "allow-address-pairs",
4155 "floating_ip": "10.1.1.12",
4156 "net_id": f
"TASK-{ns_preffix}",
4159 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4160 self
.assertDictEqual(result
, expected_net_item
)
4162 def test_create_net_item_of_interface_invalid_net_text(self
):
4163 """net-text is invalid."""
4166 "vcpi": "sample_vcpi",
4167 "port_security": True,
4168 "port_security_disable_strategy": "allow-address-pairs",
4169 "floating_ip": "10.1.1.12",
4170 "ns-vld-id": "mgmtnet",
4171 "vnf-vld-id": "mgmt_cp_int",
4174 with self
.assertRaises(TypeError):
4175 self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4177 def test_create_net_item_of_interface_empty_interface(self
):
4178 """Interface dict is empty."""
4180 net_text
= ns_preffix
4181 expected_net_item
= {
4182 "net_id": f
"TASK-{ns_preffix}",
4185 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
4186 self
.assertDictEqual(result
, expected_net_item
)
4188 @patch("osm_ng_ro.ns.deep_get")
4189 def test_prepare_type_of_interface_type_sriov(self
, mock_deep_get
):
4190 """Interface type is SR-IOV."""
4193 "vcpi": "sample_vcpi",
4194 "port_security": True,
4195 "port_security_disable_strategy": "allow-address-pairs",
4196 "floating_ip": "10.1.1.12",
4197 "ns-vld-id": "mgmtnet",
4198 "vnf-vld-id": "mgmt_cp_int",
4201 mock_deep_get
.return_value
= "SR-IOV"
4202 net_text
= ns_preffix
4204 expected_net_item
= {
4209 self
.ns
._prepare
_type
_of
_interface
(
4210 interface
, tasks_by_target_record_id
, net_text
, net_item
4212 self
.assertDictEqual(net_item
, expected_net_item
)
4215 tasks_by_target_record_id
[net_text
]["extra_dict"]["params"]["net_type"],
4217 mock_deep_get
.assert_called_once_with(
4218 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4221 @patch("osm_ng_ro.ns.deep_get")
4222 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
4225 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
4228 "vcpi": "sample_vcpi",
4229 "port_security": True,
4230 "port_security_disable_strategy": "allow-address-pairs",
4231 "floating_ip": "10.1.1.12",
4232 "ns-vld-id": "mgmtnet",
4233 "vnf-vld-id": "mgmt_cp_int",
4234 "type": "PCI-PASSTHROUGH",
4236 mock_deep_get
.return_value
= {}
4237 tasks_by_target_record_id
= {}
4238 net_text
= ns_preffix
4240 expected_net_item
= {
4242 "model": "PCI-PASSTHROUGH",
4243 "type": "PCI-PASSTHROUGH",
4245 self
.ns
._prepare
_type
_of
_interface
(
4246 interface
, tasks_by_target_record_id
, net_text
, net_item
4248 self
.assertDictEqual(net_item
, expected_net_item
)
4249 mock_deep_get
.assert_called_once_with(
4250 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4253 @patch("osm_ng_ro.ns.deep_get")
4254 def test_prepare_type_of_interface_type_mgmt(self
, mock_deep_get
):
4255 """Interface type is mgmt."""
4258 "vcpi": "sample_vcpi",
4259 "port_security": True,
4260 "port_security_disable_strategy": "allow-address-pairs",
4261 "floating_ip": "10.1.1.12",
4262 "ns-vld-id": "mgmtnet",
4263 "vnf-vld-id": "mgmt_cp_int",
4266 tasks_by_target_record_id
= {}
4267 net_text
= ns_preffix
4269 expected_net_item
= {
4272 self
.ns
._prepare
_type
_of
_interface
(
4273 interface
, tasks_by_target_record_id
, net_text
, net_item
4275 self
.assertDictEqual(net_item
, expected_net_item
)
4276 mock_deep_get
.assert_not_called()
4278 @patch("osm_ng_ro.ns.deep_get")
4279 def test_prepare_type_of_interface_type_bridge(self
, mock_deep_get
):
4280 """Interface type is bridge."""
4283 "vcpi": "sample_vcpi",
4284 "port_security": True,
4285 "port_security_disable_strategy": "allow-address-pairs",
4286 "floating_ip": "10.1.1.12",
4287 "ns-vld-id": "mgmtnet",
4288 "vnf-vld-id": "mgmt_cp_int",
4290 tasks_by_target_record_id
= {}
4291 net_text
= ns_preffix
4293 expected_net_item
= {
4297 self
.ns
._prepare
_type
_of
_interface
(
4298 interface
, tasks_by_target_record_id
, net_text
, net_item
4300 self
.assertDictEqual(net_item
, expected_net_item
)
4301 mock_deep_get
.assert_not_called()
4303 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4304 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4305 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4306 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4307 def test_prepare_vdu_interfaces(
4309 mock_type_of_interface
,
4310 mock_item_of_interface
,
4312 mock_vld_information_of_interface
,
4314 """Prepare vdu interfaces successfully."""
4315 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4318 "ns-vld-id": "net1",
4319 "ip-address": "13.2.12.31",
4320 "mgmt-interface": True,
4324 "vnf-vld-id": "net2",
4325 "mac-address": "d0:94:66:ed:fc:e2",
4329 "ns-vld-id": "mgmtnet",
4331 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4333 "params": "test_params",
4334 "find_params": "test_find_params",
4338 net_text_1
= f
"{ns_preffix}:net1"
4339 net_text_2
= f
"{vnf_preffix}:net2"
4340 net_text_3
= f
"{ns_preffix}:mgmtnet"
4343 "net_id": f
"TASK-{ns_preffix}",
4348 "net_id": f
"TASK-{ns_preffix}",
4353 "net_id": f
"TASK-{ns_preffix}",
4356 mock_item_of_interface
.side_effect
= [net_item_1
, net_item_2
, net_item_3
]
4357 mock_vld_information_of_interface
.side_effect
= [
4363 expected_extra_dict
= {
4364 "params": "test_params",
4365 "find_params": "test_find_params",
4366 "depends_on": [net_text_1
, net_text_2
, net_text_3
],
4367 "mgmt_vdu_interface": 0,
4369 updated_net_item1
= deepcopy(net_item_1
)
4370 updated_net_item1
.update({"ip_address": "13.2.12.31"})
4371 updated_net_item2
= deepcopy(net_item_2
)
4372 updated_net_item2
.update({"mac_address": "d0:94:66:ed:fc:e2"})
4373 expected_net_list
= [updated_net_item1
, updated_net_item2
, net_item_3
]
4374 self
.ns
._prepare
_vdu
_interfaces
(
4380 tasks_by_target_record_id
,
4383 _call_mock_vld_information_of_interface
= (
4384 mock_vld_information_of_interface
.call_args_list
4387 _call_mock_vld_information_of_interface
[0][0],
4388 (interface_1
, ns_preffix
, vnf_preffix
),
4391 _call_mock_vld_information_of_interface
[1][0],
4392 (interface_2
, ns_preffix
, vnf_preffix
),
4395 _call_mock_vld_information_of_interface
[2][0],
4396 (interface_3
, ns_preffix
, vnf_preffix
),
4399 _call_mock_port_security
= mock_port_security
.call_args_list
4400 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4401 self
.assertEqual(_call_mock_port_security
[1].args
[0], interface_2
)
4402 self
.assertEqual(_call_mock_port_security
[2].args
[0], interface_3
)
4404 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4405 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4406 self
.assertEqual(_call_mock_item_of_interface
[1][0], (interface_2
, net_text_2
))
4407 self
.assertEqual(_call_mock_item_of_interface
[2][0], (interface_3
, net_text_3
))
4409 _call_mock_type_of_interface
= mock_type_of_interface
.call_args_list
4411 _call_mock_type_of_interface
[0][0],
4412 (interface_1
, tasks_by_target_record_id
, net_text_1
, net_item_1
),
4415 _call_mock_type_of_interface
[1][0],
4416 (interface_2
, tasks_by_target_record_id
, net_text_2
, net_item_2
),
4419 _call_mock_type_of_interface
[2][0],
4420 (interface_3
, tasks_by_target_record_id
, net_text_3
, net_item_3
),
4422 self
.assertEqual(net_list
, expected_net_list
)
4423 self
.assertEqual(extra_dict
, expected_extra_dict
)
4424 self
.logger
.error
.assert_not_called()
4426 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4427 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4428 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4429 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4430 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4432 mock_type_of_interface
,
4433 mock_item_of_interface
,
4435 mock_vld_information_of_interface
,
4437 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4438 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4441 "ns-vld-id": "net1",
4442 "ip-address": "13.2.12.31",
4443 "mgmt-interface": True,
4447 "vnf-vld-id": "net2",
4448 "mac-address": "d0:94:66:ed:fc:e2",
4452 "ns-vld-id": "mgmtnet",
4454 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4456 "params": "test_params",
4457 "find_params": "test_find_params",
4460 net_text_1
= f
"{ns_preffix}:net1"
4461 mock_item_of_interface
.side_effect
= [TypeError, TypeError, TypeError]
4463 mock_vld_information_of_interface
.side_effect
= [net_text_1
]
4465 expected_extra_dict
= {
4466 "params": "test_params",
4467 "find_params": "test_find_params",
4468 "depends_on": [net_text_1
],
4470 with self
.assertRaises(TypeError):
4471 self
.ns
._prepare
_vdu
_interfaces
(
4477 tasks_by_target_record_id
,
4481 _call_mock_vld_information_of_interface
= (
4482 mock_vld_information_of_interface
.call_args_list
4485 _call_mock_vld_information_of_interface
[0][0],
4486 (interface_1
, ns_preffix
, vnf_preffix
),
4489 _call_mock_port_security
= mock_port_security
.call_args_list
4490 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4492 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4493 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4495 mock_type_of_interface
.assert_not_called()
4496 self
.logger
.error
.assert_not_called()
4497 self
.assertEqual(net_list
, [])
4498 self
.assertEqual(extra_dict
, expected_extra_dict
)
4500 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4501 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4502 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4503 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4504 def test_prepare_vdu_interfaces_vld_information_is_empty(
4506 mock_type_of_interface
,
4507 mock_item_of_interface
,
4509 mock_vld_information_of_interface
,
4511 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4512 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4515 "ns-vld-id": "net1",
4516 "ip-address": "13.2.12.31",
4517 "mgmt-interface": True,
4521 "vnf-vld-id": "net2",
4522 "mac-address": "d0:94:66:ed:fc:e2",
4526 "ns-vld-id": "mgmtnet",
4528 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4530 "params": "test_params",
4531 "find_params": "test_find_params",
4534 mock_vld_information_of_interface
.side_effect
= ["", "", ""]
4536 self
.ns
._prepare
_vdu
_interfaces
(
4542 tasks_by_target_record_id
,
4546 _call_mock_vld_information_of_interface
= (
4547 mock_vld_information_of_interface
.call_args_list
4550 _call_mock_vld_information_of_interface
[0][0],
4551 (interface_1
, ns_preffix
, vnf_preffix
),
4554 _call_mock_vld_information_of_interface
[1][0],
4555 (interface_2
, ns_preffix
, vnf_preffix
),
4558 _call_mock_vld_information_of_interface
[2][0],
4559 (interface_3
, ns_preffix
, vnf_preffix
),
4562 _call_logger
= self
.logger
.error
.call_args_list
4565 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4569 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4573 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4575 self
.assertEqual(net_list
, [])
4579 "params": "test_params",
4580 "find_params": "test_find_params",
4585 mock_item_of_interface
.assert_not_called()
4586 mock_port_security
.assert_not_called()
4587 mock_type_of_interface
.assert_not_called()
4589 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4590 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4591 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4592 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4593 def test_prepare_vdu_interfaces_empty_interface_list(
4595 mock_type_of_interface
,
4596 mock_item_of_interface
,
4598 mock_vld_information_of_interface
,
4600 """Prepare vdu interfaces, interface list is empty."""
4601 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4602 target_vdu
["interfaces"] = []
4605 self
.ns
._prepare
_vdu
_interfaces
(
4611 tasks_by_target_record_id
,
4614 mock_type_of_interface
.assert_not_called()
4615 mock_vld_information_of_interface
.assert_not_called()
4616 mock_item_of_interface
.assert_not_called()
4617 mock_port_security
.assert_not_called()
4619 def test_prepare_vdu_ssh_keys(self
):
4620 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4621 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4622 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4623 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4624 target_vdu
["ssh-access-required"] = True
4626 expected_cloud_config
= {
4627 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4629 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4630 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4632 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self
):
4633 """Target_vdu does not have ssh-keys."""
4634 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4635 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4636 target_vdu
["ssh-access-required"] = True
4638 expected_cloud_config
= {"key-pairs": [{"public_key": "path_of_public_key"}]}
4639 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4640 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4642 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self
):
4643 """Target_vdu has ssh-keys, ssh-access is not required."""
4644 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4645 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4646 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4647 target_vdu
["ssh-access-required"] = False
4649 expected_cloud_config
= {"key-pairs": ["sample-ssh-key"]}
4650 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4651 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4653 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4654 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4655 def test_add_persistent_root_disk_to_disk_list_keep_false(
4656 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4658 """Add persistent root disk to disk_list, keep volume set to False."""
4660 "id": "persistent-root-volume",
4661 "type-of-storage": "persistent-storage:persistent-storage",
4662 "size-of-storage": "10",
4664 mock_select_persistent_root_disk
.return_value
= root_disk
4665 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4666 vnfd
["virtual-storage-desc"][1] = root_disk
4667 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4668 persistent_root_disk
= {}
4670 mock_volume_keeping_required
.return_value
= False
4671 expected_disk_list
= [
4673 "image_id": "ubuntu20.04",
4678 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4679 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4681 self
.assertEqual(disk_list
, expected_disk_list
)
4682 mock_select_persistent_root_disk
.assert_called_once()
4683 mock_volume_keeping_required
.assert_called_once()
4685 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4686 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4687 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4688 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4690 """Add persistent root disk to disk_list"""
4692 "id": "persistent-root-volume",
4693 "type-of-storage": "persistent-storage:persistent-storage",
4694 "size-of-storage": "10",
4696 mock_select_persistent_root_disk
.side_effect
= AttributeError
4697 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4698 vnfd
["virtual-storage-desc"][1] = root_disk
4699 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4700 persistent_root_disk
= {}
4702 with self
.assertRaises(AttributeError):
4703 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4704 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4706 self
.assertEqual(disk_list
, [])
4707 mock_select_persistent_root_disk
.assert_called_once()
4708 mock_volume_keeping_required
.assert_not_called()
4710 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4711 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4712 def test_add_persistent_root_disk_to_disk_list_keep_true(
4713 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4715 """Add persistent root disk, keeo volume set to True."""
4716 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4717 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4718 mock_volume_keeping_required
.return_value
= True
4720 "id": "persistent-root-volume",
4721 "type-of-storage": "persistent-storage:persistent-storage",
4722 "size-of-storage": "10",
4723 "vdu-storage-requirements": [
4724 {"key": "keep-volume", "value": "true"},
4727 mock_select_persistent_root_disk
.return_value
= root_disk
4728 persistent_root_disk
= {}
4730 expected_disk_list
= [
4732 "image_id": "ubuntu20.04",
4737 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4738 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4740 self
.assertEqual(disk_list
, expected_disk_list
)
4741 mock_volume_keeping_required
.assert_called_once_with(root_disk
)
4743 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4744 def test_add_persistent_ordinary_disk_to_disk_list(
4745 self
, mock_volume_keeping_required
4747 """Add persistent ordinary disk, keeo volume set to True."""
4748 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4749 mock_volume_keeping_required
.return_value
= False
4750 persistent_root_disk
= {
4751 "persistent-root-volume": {
4752 "image_id": "ubuntu20.04",
4758 "id": "persistent-volume2",
4759 "type-of-storage": "persistent-storage:persistent-storage",
4760 "size-of-storage": "10",
4762 persistent_ordinary_disk
= {}
4764 expected_disk_list
= [
4770 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4771 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4773 self
.assertEqual(disk_list
, expected_disk_list
)
4774 mock_volume_keeping_required
.assert_called_once_with(ordinary_disk
)
4776 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4777 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(
4778 self
, mock_volume_keeping_required
4780 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4781 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4782 mock_volume_keeping_required
.return_value
= False
4783 persistent_root_disk
= {
4784 "persistent-root-volume": {
4785 "image_id": "ubuntu20.04",
4789 "persistent-volume2": {
4793 persistent_ordinary_disk
= {}
4796 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4797 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4799 self
.assertEqual(disk_list
, [])
4800 mock_volume_keeping_required
.assert_not_called()
4802 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4803 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4804 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4805 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4807 """VNFD does not have persistent storage."""
4808 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4809 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4810 mock_select_persistent_root_disk
.return_value
= None
4811 persistent_root_disk
= {}
4813 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4814 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4816 self
.assertEqual(disk_list
, [])
4817 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4818 mock_volume_keeping_required
.assert_not_called()
4820 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4821 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4822 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4823 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4825 """Persistent_root_disk dict is empty."""
4826 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4827 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4828 mock_select_persistent_root_disk
.return_value
= None
4829 persistent_root_disk
= {}
4831 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4832 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4834 self
.assertEqual(disk_list
, [])
4835 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4836 mock_volume_keeping_required
.assert_not_called()
4838 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self
):
4839 """Invalid extra dict."""
4840 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4841 target_vdu
["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4843 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4844 with self
.assertRaises(NsException
) as err
:
4845 self
.ns
._prepare
_vdu
_affinity
_group
_list
(target_vdu
, extra_dict
, ns_preffix
)
4846 self
.assertEqual(str(err
.exception
), "Invalid extra_dict format.")
4848 def test_prepare_vdu_affinity_group_list_one_affinity_group(self
):
4849 """There is one affinity-group."""
4850 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4851 target_vdu
["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4852 extra_dict
= {"depends_on": []}
4853 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4854 affinity_group_txt
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4855 expected_result
= [{"affinity_group_id": "TASK-" + affinity_group_txt
}]
4856 expected_extra_dict
= {"depends_on": [affinity_group_txt
]}
4857 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4858 target_vdu
, extra_dict
, ns_preffix
4860 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4861 self
.assertEqual(result
, expected_result
)
4863 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self
):
4864 """There are two affinity-groups."""
4865 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4866 target_vdu
["affinity-or-anti-affinity-group-id"] = [
4867 "affinity-group-id1",
4868 "affinity-group-id2",
4870 extra_dict
= {"depends_on": []}
4871 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4872 affinity_group_txt1
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4873 affinity_group_txt2
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4875 {"affinity_group_id": "TASK-" + affinity_group_txt1
},
4876 {"affinity_group_id": "TASK-" + affinity_group_txt2
},
4878 expected_extra_dict
= {"depends_on": [affinity_group_txt1
, affinity_group_txt2
]}
4879 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4880 target_vdu
, extra_dict
, ns_preffix
4882 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4883 self
.assertEqual(result
, expected_result
)
4885 def test_prepare_vdu_affinity_group_list_no_affinity_group(self
):
4886 """There is not any affinity-group."""
4887 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4888 extra_dict
= {"depends_on": []}
4889 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4890 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4891 target_vdu
, extra_dict
, ns_preffix
4893 self
.assertDictEqual(extra_dict
, {"depends_on": []})
4894 self
.assertEqual(result
, [])
4896 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4897 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4898 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4899 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4900 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4901 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4902 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4903 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4904 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4905 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4906 def test_process_vdu_params_with_inst_vol_list(
4908 mock_prepare_vdu_affinity_group_list
,
4909 mock_add_persistent_ordinary_disks_to_disk_list
,
4910 mock_add_persistent_root_disk_to_disk_list
,
4911 mock_find_persistent_volumes
,
4912 mock_find_persistent_root_volumes
,
4913 mock_prepare_vdu_ssh_keys
,
4914 mock_prepare_vdu_cloud_init
,
4915 mock_prepare_vdu_interfaces
,
4916 mock_locate_vdu_interfaces
,
4917 mock_sort_vdu_interfaces
,
4919 """Instantiation volume list is empty."""
4920 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4922 target_vdu
["interfaces"] = interfaces_wth_all_positions
4924 vdu_instantiation_vol_list
= [
4926 "vim-volume-id": vim_volume_id
,
4927 "name": "persistent-volume2",
4930 target_vdu
["additionalParams"] = {
4931 "OSM": {"vdu_volumes": vdu_instantiation_vol_list
}
4933 mock_prepare_vdu_cloud_init
.return_value
= {}
4934 mock_prepare_vdu_affinity_group_list
.return_value
= []
4935 persistent_root_disk
= {
4936 "persistent-root-volume": {
4937 "image_id": "ubuntu20.04",
4941 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
4943 new_kwargs
= deepcopy(kwargs
)
4948 "tasks_by_target_record_id": {},
4952 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
4953 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4954 db
.get_one
.return_value
= vnfd
4955 result
= Ns
._process
_vdu
_params
(
4956 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4958 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4959 mock_locate_vdu_interfaces
.assert_not_called()
4960 mock_prepare_vdu_cloud_init
.assert_called_once()
4961 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
4962 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
4963 mock_prepare_vdu_interfaces
.assert_called_once_with(
4965 expected_extra_dict_copy
,
4972 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4973 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4974 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4975 mock_find_persistent_volumes
.assert_called_once_with(
4976 persistent_root_disk
, target_vdu
, vdu_instantiation_vol_list
, []
4979 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4980 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4981 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4982 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4983 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4984 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4985 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4986 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4987 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4988 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4989 def test_process_vdu_params_wth_affinity_groups(
4991 mock_prepare_vdu_affinity_group_list
,
4992 mock_add_persistent_ordinary_disks_to_disk_list
,
4993 mock_add_persistent_root_disk_to_disk_list
,
4994 mock_find_persistent_volumes
,
4995 mock_find_persistent_root_volumes
,
4996 mock_prepare_vdu_ssh_keys
,
4997 mock_prepare_vdu_cloud_init
,
4998 mock_prepare_vdu_interfaces
,
4999 mock_locate_vdu_interfaces
,
5000 mock_sort_vdu_interfaces
,
5002 """There is cloud-config."""
5003 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5006 target_vdu
["interfaces"] = interfaces_wth_all_positions
5007 mock_prepare_vdu_cloud_init
.return_value
= {}
5008 mock_prepare_vdu_affinity_group_list
.return_value
= [
5013 new_kwargs
= deepcopy(kwargs
)
5018 "tasks_by_target_record_id": {},
5022 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
5023 expected_extra_dict3
["params"]["affinity_group_list"] = [
5027 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5028 db
.get_one
.return_value
= vnfd
5029 result
= Ns
._process
_vdu
_params
(
5030 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5032 self
.assertDictEqual(result
, expected_extra_dict3
)
5033 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5034 mock_locate_vdu_interfaces
.assert_not_called()
5035 mock_prepare_vdu_cloud_init
.assert_called_once()
5036 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5037 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5038 mock_prepare_vdu_interfaces
.assert_called_once_with(
5040 expected_extra_dict3
,
5048 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5049 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5050 mock_find_persistent_volumes
.assert_not_called()
5052 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5053 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5054 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5055 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5056 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5057 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5058 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5059 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5060 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5061 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5062 def test_process_vdu_params_wth_cloud_config(
5064 mock_prepare_vdu_affinity_group_list
,
5065 mock_add_persistent_ordinary_disks_to_disk_list
,
5066 mock_add_persistent_root_disk_to_disk_list
,
5067 mock_find_persistent_volumes
,
5068 mock_find_persistent_root_volumes
,
5069 mock_prepare_vdu_ssh_keys
,
5070 mock_prepare_vdu_cloud_init
,
5071 mock_prepare_vdu_interfaces
,
5072 mock_locate_vdu_interfaces
,
5073 mock_sort_vdu_interfaces
,
5075 """There is cloud-config."""
5076 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5079 target_vdu
["interfaces"] = interfaces_wth_all_positions
5080 mock_prepare_vdu_cloud_init
.return_value
= {
5081 "user-data": user_data
,
5082 "boot-data-drive": "vda",
5084 mock_prepare_vdu_affinity_group_list
.return_value
= []
5086 new_kwargs
= deepcopy(kwargs
)
5091 "tasks_by_target_record_id": {},
5095 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
5096 expected_extra_dict3
["params"]["cloud_config"] = {
5097 "user-data": user_data
,
5098 "boot-data-drive": "vda",
5100 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5101 db
.get_one
.return_value
= vnfd
5102 result
= Ns
._process
_vdu
_params
(
5103 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5105 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5106 mock_locate_vdu_interfaces
.assert_not_called()
5107 mock_prepare_vdu_cloud_init
.assert_called_once()
5108 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5109 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5110 mock_prepare_vdu_interfaces
.assert_called_once_with(
5112 expected_extra_dict3
,
5119 self
.assertDictEqual(result
, expected_extra_dict3
)
5120 mock_prepare_vdu_ssh_keys
.assert_called_once_with(
5121 target_vdu
, None, {"user-data": user_data
, "boot-data-drive": "vda"}
5123 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5124 mock_find_persistent_volumes
.assert_not_called()
5126 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5127 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5128 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5129 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5130 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5131 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5132 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5133 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5134 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5135 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5136 def test_process_vdu_params_wthout_persistent_storage(
5138 mock_prepare_vdu_affinity_group_list
,
5139 mock_add_persistent_ordinary_disks_to_disk_list
,
5140 mock_add_persistent_root_disk_to_disk_list
,
5141 mock_find_persistent_volumes
,
5142 mock_find_persistent_root_volumes
,
5143 mock_prepare_vdu_ssh_keys
,
5144 mock_prepare_vdu_cloud_init
,
5145 mock_prepare_vdu_interfaces
,
5146 mock_locate_vdu_interfaces
,
5147 mock_sort_vdu_interfaces
,
5149 """There is not any persistent storage."""
5150 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5153 target_vdu
["interfaces"] = interfaces_wth_all_positions
5154 mock_prepare_vdu_cloud_init
.return_value
= {}
5155 mock_prepare_vdu_affinity_group_list
.return_value
= []
5157 new_kwargs
= deepcopy(kwargs
)
5162 "tasks_by_target_record_id": {},
5166 expected_extra_dict_copy
= deepcopy(expected_extra_dict2
)
5167 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
5168 db
.get_one
.return_value
= vnfd
5169 result
= Ns
._process
_vdu
_params
(
5170 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5172 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5173 mock_locate_vdu_interfaces
.assert_not_called()
5174 mock_prepare_vdu_cloud_init
.assert_called_once()
5175 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5176 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5177 mock_prepare_vdu_interfaces
.assert_called_once_with(
5179 expected_extra_dict_copy
,
5186 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5187 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5188 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5189 mock_find_persistent_volumes
.assert_not_called()
5191 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5192 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5193 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5194 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5195 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5196 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5197 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5198 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5199 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5200 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5201 def test_process_vdu_params_interfaces_partially_located(
5203 mock_prepare_vdu_affinity_group_list
,
5204 mock_add_persistent_ordinary_disks_to_disk_list
,
5205 mock_add_persistent_root_disk_to_disk_list
,
5206 mock_find_persistent_volumes
,
5207 mock_find_persistent_root_volumes
,
5208 mock_prepare_vdu_ssh_keys
,
5209 mock_prepare_vdu_cloud_init
,
5210 mock_prepare_vdu_interfaces
,
5211 mock_locate_vdu_interfaces
,
5212 mock_sort_vdu_interfaces
,
5214 """Some interfaces have position."""
5215 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5218 target_vdu
["interfaces"] = [
5221 "ns-vld-id": "net1",
5223 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
5226 "ns-vld-id": "mgmtnet",
5229 mock_prepare_vdu_cloud_init
.return_value
= {}
5230 mock_prepare_vdu_affinity_group_list
.return_value
= []
5231 persistent_root_disk
= {
5232 "persistent-root-volume": {
5233 "image_id": "ubuntu20.04",
5238 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5240 new_kwargs
= deepcopy(kwargs
)
5245 "tasks_by_target_record_id": {},
5250 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5251 db
.get_one
.return_value
= vnfd
5252 result
= Ns
._process
_vdu
_params
(
5253 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5255 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5256 mock_sort_vdu_interfaces
.assert_not_called()
5257 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5258 mock_prepare_vdu_cloud_init
.assert_called_once()
5259 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5260 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5261 mock_prepare_vdu_interfaces
.assert_called_once_with(
5263 expected_extra_dict_copy
,
5270 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5271 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5272 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5273 mock_find_persistent_volumes
.assert_not_called()
5274 mock_find_persistent_root_volumes
.assert_not_called()
5276 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5277 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5278 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5279 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5280 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5281 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5282 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5283 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5284 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5285 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5286 def test_process_vdu_params_no_interface_position(
5288 mock_prepare_vdu_affinity_group_list
,
5289 mock_add_persistent_ordinary_disks_to_disk_list
,
5290 mock_add_persistent_root_disk_to_disk_list
,
5291 mock_find_persistent_volumes
,
5292 mock_find_persistent_root_volumes
,
5293 mock_prepare_vdu_ssh_keys
,
5294 mock_prepare_vdu_cloud_init
,
5295 mock_prepare_vdu_interfaces
,
5296 mock_locate_vdu_interfaces
,
5297 mock_sort_vdu_interfaces
,
5299 """Interfaces do not have position."""
5300 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5303 target_vdu
["interfaces"] = interfaces_wthout_positions
5304 mock_prepare_vdu_cloud_init
.return_value
= {}
5305 mock_prepare_vdu_affinity_group_list
.return_value
= []
5306 persistent_root_disk
= {
5307 "persistent-root-volume": {
5308 "image_id": "ubuntu20.04",
5313 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5314 new_kwargs
= deepcopy(kwargs
)
5319 "tasks_by_target_record_id": {},
5324 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5325 db
.get_one
.return_value
= vnfd
5326 result
= Ns
._process
_vdu
_params
(
5327 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5329 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5330 mock_sort_vdu_interfaces
.assert_not_called()
5331 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5332 mock_prepare_vdu_cloud_init
.assert_called_once()
5333 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5334 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5335 mock_prepare_vdu_interfaces
.assert_called_once_with(
5337 expected_extra_dict_copy
,
5344 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5345 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5346 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5347 mock_find_persistent_volumes
.assert_not_called()
5348 mock_find_persistent_root_volumes
.assert_not_called()
5350 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5351 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5352 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5353 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5354 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5355 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5356 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5357 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5358 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5359 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5360 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5362 mock_prepare_vdu_affinity_group_list
,
5363 mock_add_persistent_ordinary_disks_to_disk_list
,
5364 mock_add_persistent_root_disk_to_disk_list
,
5365 mock_find_persistent_volumes
,
5366 mock_find_persistent_root_volumes
,
5367 mock_prepare_vdu_ssh_keys
,
5368 mock_prepare_vdu_cloud_init
,
5369 mock_prepare_vdu_interfaces
,
5370 mock_locate_vdu_interfaces
,
5371 mock_sort_vdu_interfaces
,
5373 """Prepare vdu interfaces method raises exception."""
5374 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5377 target_vdu
["interfaces"] = interfaces_wthout_positions
5378 mock_prepare_vdu_cloud_init
.return_value
= {}
5379 mock_prepare_vdu_affinity_group_list
.return_value
= []
5380 persistent_root_disk
= {
5381 "persistent-root-volume": {
5382 "image_id": "ubuntu20.04",
5387 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5388 new_kwargs
= deepcopy(kwargs
)
5393 "tasks_by_target_record_id": {},
5397 mock_prepare_vdu_interfaces
.side_effect
= TypeError
5399 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5400 db
.get_one
.return_value
= vnfd
5401 with self
.assertRaises(Exception) as err
:
5402 Ns
._process
_vdu
_params
(
5403 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5405 self
.assertEqual(type(err
), TypeError)
5406 mock_sort_vdu_interfaces
.assert_not_called()
5407 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5408 mock_prepare_vdu_cloud_init
.assert_not_called()
5409 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
5410 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5411 mock_prepare_vdu_interfaces
.assert_called_once()
5412 mock_prepare_vdu_ssh_keys
.assert_not_called()
5413 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5414 mock_find_persistent_volumes
.assert_not_called()
5415 mock_find_persistent_root_volumes
.assert_not_called()
5417 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5418 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5419 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5420 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5421 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5422 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5423 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5424 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5425 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5426 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5427 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5429 mock_prepare_vdu_affinity_group_list
,
5430 mock_add_persistent_ordinary_disks_to_disk_list
,
5431 mock_add_persistent_root_disk_to_disk_list
,
5432 mock_find_persistent_volumes
,
5433 mock_find_persistent_root_volumes
,
5434 mock_prepare_vdu_ssh_keys
,
5435 mock_prepare_vdu_cloud_init
,
5436 mock_prepare_vdu_interfaces
,
5437 mock_locate_vdu_interfaces
,
5438 mock_sort_vdu_interfaces
,
5440 """Add persistent root disk method raises exception."""
5441 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5444 target_vdu
["interfaces"] = interfaces_wthout_positions
5445 mock_prepare_vdu_cloud_init
.return_value
= {}
5446 mock_prepare_vdu_affinity_group_list
.return_value
= []
5447 mock_add_persistent_root_disk_to_disk_list
.side_effect
= KeyError
5448 new_kwargs
= deepcopy(kwargs
)
5453 "tasks_by_target_record_id": {},
5458 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5459 db
.get_one
.return_value
= vnfd
5460 with self
.assertRaises(Exception) as err
:
5461 Ns
._process
_vdu
_params
(
5462 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5464 self
.assertEqual(type(err
), KeyError)
5465 mock_sort_vdu_interfaces
.assert_not_called()
5466 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5467 mock_prepare_vdu_cloud_init
.assert_called_once()
5468 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5469 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5470 mock_prepare_vdu_interfaces
.assert_called_once_with(
5474 f
"{ns_preffix}:image.0",
5475 f
"{ns_preffix}:flavor.0",
5485 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5486 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5487 mock_find_persistent_volumes
.assert_not_called()
5488 mock_find_persistent_root_volumes
.assert_not_called()
5490 def test_select_persistent_root_disk(self
):
5491 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5492 vdu
["virtual-storage-desc"] = [
5493 "persistent-root-volume",
5494 "persistent-volume2",
5497 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5498 expected_result
= vsd
5499 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5500 self
.assertEqual(result
, expected_result
)
5502 def test_select_persistent_root_disk_first_vsd_is_different(self
):
5503 """VDU first virtual-storage-desc is different than vsd id."""
5504 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5505 vdu
["virtual-storage-desc"] = [
5506 "persistent-volume2",
5507 "persistent-root-volume",
5510 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5511 expected_result
= None
5512 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5513 self
.assertEqual(result
, expected_result
)
5515 def test_select_persistent_root_disk_vsd_is_not_persistent(self
):
5516 """vsd type is not persistent."""
5517 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5518 vdu
["virtual-storage-desc"] = [
5519 "persistent-volume2",
5520 "persistent-root-volume",
5523 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5524 vsd
["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5525 expected_result
= None
5526 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5527 self
.assertEqual(result
, expected_result
)
5529 def test_select_persistent_root_disk_vsd_does_not_have_size(self
):
5530 """vsd size is None."""
5531 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5532 vdu
["virtual-storage-desc"] = [
5533 "persistent-volume2",
5534 "persistent-root-volume",
5537 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5538 vsd
["size-of-storage"] = None
5539 expected_result
= None
5540 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5541 self
.assertEqual(result
, expected_result
)
5543 def test_select_persistent_root_disk_vdu_wthout_vsd(self
):
5544 """VDU does not have virtual-storage-desc."""
5545 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5546 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5547 expected_result
= None
5548 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5549 self
.assertEqual(result
, expected_result
)
5551 def test_select_persistent_root_disk_invalid_vsd_type(self
):
5552 """vsd is list, expected to be a dict."""
5553 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5554 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"]
5555 with self
.assertRaises(AttributeError):
5556 Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)