1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy
import deepcopy
19 from unittest
.mock
import MagicMock
, Mock
, patch
29 from osm_ng_ro
.ns
import Ns
, NsException
32 __author__
= "Eduardo Sousa"
33 __date__
= "$19-NOV-2021 00:00:00$"
36 # Variables used in Tests
37 vnfd_wth_persistent_storage
= {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
56 "virtual-storage-desc": [
57 "persistent-root-volume",
64 "virtual-storage-desc": [
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
74 "vdu-storage-requirements": [
75 {"key": "keep-volume", "value": "true"},
79 "id": "ephemeral-volume",
80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
81 "size-of-storage": "1",
87 "path": "/app/storage/",
92 vim_volume_id
= "ru937f49-3870-4169-b758-9732e1ff40f3"
93 task_by_target_record_id
= {
94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
95 "extra_dict": {"params": {"net_type": "SR-IOV"}}
98 interfaces_wthout_positions
= [
109 "ns-vld-id": "mgmtnet",
112 interfaces_wth_all_positions
= [
125 "ns-vld-id": "mgmtnet",
129 target_vdu_wth_persistent_storage
= {
130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
133 "vdu-name": "several_volumes-VM",
137 "ns-vld-id": "mgmtnet",
140 "virtual-storages": [
142 "id": "persistent-volume2",
143 "size-of-storage": "10",
144 "type-of-storage": "persistent-storage:persistent-storage",
147 "id": "persistent-root-volume",
148 "size-of-storage": "10",
149 "type-of-storage": "persistent-storage:persistent-storage",
150 "vdu-storage-requirements": [
151 {"key": "keep-volume", "value": "true"},
155 "id": "ephemeral-volume",
156 "size-of-storage": "1",
157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
161 db
= MagicMock(name
="database mock")
162 fs
= MagicMock(name
="database mock")
163 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
164 vnf_preffix
= "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
165 vnfr_id
= "wh47f48-y870-4169-b758-5732e1ff40f5"
166 nsr_id
= "th47f48-9870-4169-b758-9732e1ff40f3"
168 "name": "sample_name",
170 expected_extra_dict
= {
172 f
"{ns_preffix}:image.0",
173 f
"{ns_preffix}:flavor.0",
176 "affinity_group_list": [],
177 "availability_zone_index": None,
178 "availability_zone_list": None,
179 "cloud_config": None,
180 "description": "several_volumes-VM",
182 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
183 "image_id": f
"TASK-{ns_preffix}:image.0",
184 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
190 expected_extra_dict2
= {
192 f
"{ns_preffix}:image.0",
193 f
"{ns_preffix}:flavor.0",
196 "affinity_group_list": [],
197 "availability_zone_index": None,
198 "availability_zone_list": None,
199 "cloud_config": None,
200 "description": "without_volumes-VM",
202 "flavor_id": f
"TASK-{ns_preffix}:flavor.0",
203 "image_id": f
"TASK-{ns_preffix}:image.0",
204 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
210 expected_extra_dict3
= {
212 f
"{ns_preffix}:image.0",
215 "affinity_group_list": [],
216 "availability_zone_index": None,
217 "availability_zone_list": None,
218 "cloud_config": None,
219 "description": "without_volumes-VM",
221 "flavor_id": "flavor_test",
222 "image_id": f
"TASK-{ns_preffix}:image.0",
223 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
228 tasks_by_target_record_id
= {
229 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
232 "net_type": "SR-IOV",
239 "vdu2cloud_init": {},
241 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
242 "member-vnf-index-ref": "vnf-several-volumes",
245 vnfd_wthout_persistent_storage
= {
246 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
250 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
253 "id": "without_volumes-vnf",
254 "product-name": "without_volumes-vnf",
257 "id": "without_volumes-VM",
258 "name": "without_volumes-VM",
259 "sw-image-desc": "ubuntu20.04",
260 "alternative-sw-image-desc": [
264 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
268 "virtual-storage-desc": [
269 {"id": "root-volume", "size-of-storage": "10"},
271 "id": "ephemeral-volume",
272 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
273 "size-of-storage": "1",
279 "path": "/app/storage/",
285 target_vdu_wthout_persistent_storage
= {
286 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
289 "vdu-name": "without_volumes-VM",
293 "ns-vld-id": "mgmtnet",
296 "virtual-storages": [
299 "size-of-storage": "10",
302 "id": "ephemeral-volume",
303 "size-of-storage": "1",
304 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
308 cloud_init_content
= """
313 overwrite: {{is_override}}
316 - [ sh, -xc, "echo $(date) '{{command}}'" ]
327 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
331 class CopyingMock(MagicMock
):
332 def __call__(self
, *args
, **kwargs
):
333 args
= deepcopy(args
)
334 kwargs
= deepcopy(kwargs
)
335 return super(CopyingMock
, self
).__call
__(*args
, **kwargs
)
338 class TestNs(unittest
.TestCase
):
342 def test__create_task_without_extra_dict(self
):
344 "target_id": "vim_openstack_1",
345 "action_id": "123456",
347 "task_id": "123456:1",
348 "status": "SCHEDULED",
351 "target_record": "test_target_record",
352 "target_record_id": "test_target_record_id",
355 "action_id": "123456",
360 task
= Ns
._create
_task
(
361 deployment_info
=deployment_info
,
362 target_id
="vim_openstack_1",
365 target_record
="test_target_record",
366 target_record_id
="test_target_record_id",
369 self
.assertEqual(deployment_info
.get("task_index"), 2)
370 self
.assertDictEqual(task
, expected_result
)
372 def test__create_task(self
):
374 "target_id": "vim_openstack_1",
375 "action_id": "123456",
377 "task_id": "123456:1",
378 "status": "SCHEDULED",
381 "target_record": "test_target_record",
382 "target_record_id": "test_target_record_id",
383 # values coming from extra_dict
384 "params": "test_params",
385 "find_params": "test_find_params",
386 "depends_on": "test_depends_on",
389 "action_id": "123456",
394 task
= Ns
._create
_task
(
395 deployment_info
=deployment_info
,
396 target_id
="vim_openstack_1",
399 target_record
="test_target_record",
400 target_record_id
="test_target_record_id",
402 "params": "test_params",
403 "find_params": "test_find_params",
404 "depends_on": "test_depends_on",
408 self
.assertEqual(deployment_info
.get("task_index"), 2)
409 self
.assertDictEqual(task
, expected_result
)
411 @patch("osm_ng_ro.ns.time")
412 def test__create_ro_task(self
, mock_time
: Mock
):
413 now
= 1637324838.994551
414 mock_time
.return_value
= now
416 "target_id": "vim_openstack_1",
417 "action_id": "123456",
419 "task_id": "123456:1",
420 "status": "SCHEDULED",
423 "target_record": "test_target_record",
424 "target_record_id": "test_target_record_id",
425 # values coming from extra_dict
426 "params": "test_params",
427 "find_params": "test_find_params",
428 "depends_on": "test_depends_on",
434 "target_id": "vim_openstack_1",
437 "created_items": None,
451 ro_task
= Ns
._create
_ro
_task
(
452 target_id
="vim_openstack_1",
456 self
.assertDictEqual(ro_task
, expected_result
)
458 def test__process_image_params_with_empty_target_image(self
):
464 result
= Ns
._process
_image
_params
(
465 target_image
=target_image
,
468 target_record_id
=None,
471 self
.assertDictEqual(expected_result
, result
)
473 def test__process_image_params_with_wrong_target_image(self
):
478 "no_image": "to_see_here",
481 result
= Ns
._process
_image
_params
(
482 target_image
=target_image
,
485 target_record_id
=None,
488 self
.assertDictEqual(expected_result
, result
)
490 def test__process_image_params_with_image(self
):
502 result
= Ns
._process
_image
_params
(
503 target_image
=target_image
,
506 target_record_id
=None,
509 self
.assertDictEqual(expected_result
, result
)
511 def test__process_image_params_with_vim_image_id(self
):
520 "vim_image_id": "123456",
523 result
= Ns
._process
_image
_params
(
524 target_image
=target_image
,
527 target_record_id
=None,
530 self
.assertDictEqual(expected_result
, result
)
532 def test__process_image_params_with_image_checksum(self
):
536 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
541 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
544 result
= Ns
._process
_image
_params
(
545 target_image
=target_image
,
548 target_record_id
=None,
551 self
.assertDictEqual(expected_result
, result
)
553 def test__get_resource_allocation_params_with_empty_target_image(self
):
555 quota_descriptor
= {}
557 result
= Ns
._get
_resource
_allocation
_params
(
558 quota_descriptor
=quota_descriptor
,
561 self
.assertDictEqual(expected_result
, result
)
563 def test__get_resource_allocation_params_with_wrong_target_image(self
):
566 "no_quota": "present_here",
569 result
= Ns
._get
_resource
_allocation
_params
(
570 quota_descriptor
=quota_descriptor
,
573 self
.assertDictEqual(expected_result
, result
)
575 def test__get_resource_allocation_params_with_limit(self
):
583 result
= Ns
._get
_resource
_allocation
_params
(
584 quota_descriptor
=quota_descriptor
,
587 self
.assertDictEqual(expected_result
, result
)
589 def test__get_resource_allocation_params_with_reserve(self
):
597 result
= Ns
._get
_resource
_allocation
_params
(
598 quota_descriptor
=quota_descriptor
,
601 self
.assertDictEqual(expected_result
, result
)
603 def test__get_resource_allocation_params_with_shares(self
):
611 result
= Ns
._get
_resource
_allocation
_params
(
612 quota_descriptor
=quota_descriptor
,
615 self
.assertDictEqual(expected_result
, result
)
617 def test__get_resource_allocation_params(self
):
629 result
= Ns
._get
_resource
_allocation
_params
(
630 quota_descriptor
=quota_descriptor
,
633 self
.assertDictEqual(expected_result
, result
)
635 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
636 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
644 result
= Ns
._process
_guest
_epa
_quota
_params
(
645 guest_epa_quota
=guest_epa_quota
,
646 epa_vcpu_set
=epa_vcpu_set
,
649 self
.assertDictEqual(expected_result
, result
)
650 self
.assertFalse(resource_allocation
.called
)
652 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
653 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
661 result
= Ns
._process
_guest
_epa
_quota
_params
(
662 guest_epa_quota
=guest_epa_quota
,
663 epa_vcpu_set
=epa_vcpu_set
,
666 self
.assertDictEqual(expected_result
, result
)
667 self
.assertFalse(resource_allocation
.called
)
669 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
670 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
676 "no-quota": "nothing",
680 result
= Ns
._process
_guest
_epa
_quota
_params
(
681 guest_epa_quota
=guest_epa_quota
,
682 epa_vcpu_set
=epa_vcpu_set
,
685 self
.assertDictEqual(expected_result
, result
)
686 self
.assertFalse(resource_allocation
.called
)
688 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
689 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
695 "no-quota": "nothing",
699 result
= Ns
._process
_guest
_epa
_quota
_params
(
700 guest_epa_quota
=guest_epa_quota
,
701 epa_vcpu_set
=epa_vcpu_set
,
704 self
.assertDictEqual(expected_result
, result
)
705 self
.assertFalse(resource_allocation
.called
)
707 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
708 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
722 result
= Ns
._process
_guest
_epa
_quota
_params
(
723 guest_epa_quota
=guest_epa_quota
,
724 epa_vcpu_set
=epa_vcpu_set
,
727 self
.assertDictEqual(expected_result
, result
)
728 self
.assertFalse(resource_allocation
.called
)
730 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
731 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
751 resource_allocation_param
= {
756 resource_allocation
.return_value
= {
762 result
= Ns
._process
_guest
_epa
_quota
_params
(
763 guest_epa_quota
=guest_epa_quota
,
764 epa_vcpu_set
=epa_vcpu_set
,
767 resource_allocation
.assert_called_once_with(resource_allocation_param
)
768 self
.assertDictEqual(expected_result
, result
)
770 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
771 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
791 resource_allocation_param
= {
796 resource_allocation
.return_value
= {
802 result
= Ns
._process
_guest
_epa
_quota
_params
(
803 guest_epa_quota
=guest_epa_quota
,
804 epa_vcpu_set
=epa_vcpu_set
,
807 resource_allocation
.assert_called_once_with(resource_allocation_param
)
808 self
.assertDictEqual(expected_result
, result
)
810 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
811 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
831 resource_allocation_param
= {
836 resource_allocation
.return_value
= {
842 result
= Ns
._process
_guest
_epa
_quota
_params
(
843 guest_epa_quota
=guest_epa_quota
,
844 epa_vcpu_set
=epa_vcpu_set
,
847 resource_allocation
.assert_called_once_with(resource_allocation_param
)
848 self
.assertDictEqual(expected_result
, result
)
850 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
851 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
871 resource_allocation_param
= {
876 resource_allocation
.return_value
= {
882 result
= Ns
._process
_guest
_epa
_quota
_params
(
883 guest_epa_quota
=guest_epa_quota
,
884 epa_vcpu_set
=epa_vcpu_set
,
887 resource_allocation
.assert_called_once_with(resource_allocation_param
)
888 self
.assertDictEqual(expected_result
, result
)
890 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
891 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
911 resource_allocation_param
= {
916 resource_allocation
.return_value
= {
922 result
= Ns
._process
_guest
_epa
_quota
_params
(
923 guest_epa_quota
=guest_epa_quota
,
924 epa_vcpu_set
=epa_vcpu_set
,
927 resource_allocation
.assert_called_once_with(resource_allocation_param
)
928 self
.assertDictEqual(expected_result
, result
)
930 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
931 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
951 resource_allocation_param
= {
956 resource_allocation
.return_value
= {
962 result
= Ns
._process
_guest
_epa
_quota
_params
(
963 guest_epa_quota
=guest_epa_quota
,
964 epa_vcpu_set
=epa_vcpu_set
,
967 resource_allocation
.assert_called_once_with(resource_allocation_param
)
968 self
.assertDictEqual(expected_result
, result
)
970 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
971 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
991 resource_allocation_param
= {
996 resource_allocation
.return_value
= {
1002 result
= Ns
._process
_guest
_epa
_quota
_params
(
1003 guest_epa_quota
=guest_epa_quota
,
1004 epa_vcpu_set
=epa_vcpu_set
,
1007 resource_allocation
.assert_called_once_with(resource_allocation_param
)
1008 self
.assertDictEqual(expected_result
, result
)
1010 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1011 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
1013 resource_allocation
,
1056 resource_allocation
.return_value
= {
1062 result
= Ns
._process
_guest
_epa
_quota
_params
(
1063 guest_epa_quota
=guest_epa_quota
,
1064 epa_vcpu_set
=epa_vcpu_set
,
1067 self
.assertTrue(resource_allocation
.called
)
1068 self
.assertDictEqual(expected_result
, result
)
1070 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1071 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1073 resource_allocation
,
1119 epa_vcpu_set
= False
1121 resource_allocation
.return_value
= {
1127 result
= Ns
._process
_guest
_epa
_quota
_params
(
1128 guest_epa_quota
=guest_epa_quota
,
1129 epa_vcpu_set
=epa_vcpu_set
,
1132 self
.assertTrue(resource_allocation
.called
)
1133 self
.assertDictEqual(expected_result
, result
)
1135 def test__process_guest_epa_numa_params_with_empty_numa_params(self
):
1136 expected_numa_result
= []
1137 expected_epa_vcpu_set_result
= False
1138 guest_epa_quota
= {}
1140 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1141 guest_epa_quota
=guest_epa_quota
,
1143 self
.assertEqual(expected_numa_result
, numa_result
)
1144 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1146 def test__process_guest_epa_numa_params_with_wrong_numa_params(self
):
1147 expected_numa_result
= []
1148 expected_epa_vcpu_set_result
= False
1149 guest_epa_quota
= {"no_nume": "here"}
1151 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1152 guest_epa_quota
=guest_epa_quota
,
1155 self
.assertEqual(expected_numa_result
, numa_result
)
1156 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1158 def test__process_guest_epa_numa_params_with_numa_node_policy(self
):
1159 expected_numa_result
= []
1160 expected_epa_vcpu_set_result
= False
1161 guest_epa_quota
= {"numa-node-policy": {}}
1163 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1164 guest_epa_quota
=guest_epa_quota
,
1167 self
.assertEqual(expected_numa_result
, numa_result
)
1168 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1170 def test__process_guest_epa_numa_params_with_no_node(self
):
1171 expected_numa_result
= []
1172 expected_epa_vcpu_set_result
= False
1174 "numa-node-policy": {
1179 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1180 guest_epa_quota
=guest_epa_quota
,
1183 self
.assertEqual(expected_numa_result
, numa_result
)
1184 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1186 def test__process_guest_epa_numa_params_with_1_node_num_cores(self
):
1187 expected_numa_result
= [{"cores": 3}]
1188 expected_epa_vcpu_set_result
= True
1190 "numa-node-policy": {
1199 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1200 guest_epa_quota
=guest_epa_quota
,
1203 self
.assertEqual(expected_numa_result
, numa_result
)
1204 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1206 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self
):
1207 expected_numa_result
= [{"paired_threads": 3}]
1208 expected_epa_vcpu_set_result
= True
1210 "numa-node-policy": {
1213 "paired-threads": {"num-paired-threads": "3"},
1219 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1220 guest_epa_quota
=guest_epa_quota
,
1223 self
.assertEqual(expected_numa_result
, numa_result
)
1224 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1226 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self
):
1227 expected_numa_result
= [
1229 "paired-threads-id": [("0", "1"), ("4", "5")],
1232 expected_epa_vcpu_set_result
= False
1234 "numa-node-policy": {
1238 "paired-thread-ids": [
1254 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1255 guest_epa_quota
=guest_epa_quota
,
1258 self
.assertEqual(expected_numa_result
, numa_result
)
1259 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1261 def test__process_guest_epa_numa_params_with_1_node_num_threads(self
):
1262 expected_numa_result
= [{"threads": 3}]
1263 expected_epa_vcpu_set_result
= True
1265 "numa-node-policy": {
1274 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1275 guest_epa_quota
=guest_epa_quota
,
1278 self
.assertEqual(expected_numa_result
, numa_result
)
1279 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1281 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self
):
1282 expected_numa_result
= [{"memory": 2}]
1283 expected_epa_vcpu_set_result
= False
1285 "numa-node-policy": {
1294 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1295 guest_epa_quota
=guest_epa_quota
,
1298 self
.assertEqual(expected_numa_result
, numa_result
)
1299 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1301 def test__process_guest_epa_numa_params_with_1_node_vcpu(self
):
1302 expected_numa_result
= [
1308 expected_epa_vcpu_set_result
= False
1310 "numa-node-policy": {
1311 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1315 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1316 guest_epa_quota
=guest_epa_quota
,
1319 self
.assertEqual(expected_numa_result
, numa_result
)
1320 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1322 def test__process_guest_epa_numa_params_with_2_node_vcpu(self
):
1323 expected_numa_result
= [
1334 expected_epa_vcpu_set_result
= False
1336 "numa-node-policy": {
1338 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1339 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1344 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1345 guest_epa_quota
=guest_epa_quota
,
1348 self
.assertEqual(expected_numa_result
, numa_result
)
1349 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1351 def test__process_guest_epa_numa_params_with_1_node(self
):
1352 expected_numa_result
= [
1357 "paired_threads": 3,
1358 "paired-threads-id": [("0", "1"), ("4", "5")],
1363 expected_epa_vcpu_set_result
= True
1365 "numa-node-policy": {
1370 "num-paired-threads": "3",
1371 "paired-thread-ids": [
1389 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1390 guest_epa_quota
=guest_epa_quota
,
1393 self
.assertEqual(expected_numa_result
, numa_result
)
1394 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1396 def test__process_guest_epa_numa_params_with_2_nodes(self
):
1397 expected_numa_result
= [
1400 "paired_threads": 3,
1401 "paired-threads-id": [("0", "1"), ("4", "5")],
1407 "paired_threads": 7,
1408 "paired-threads-id": [("2", "3"), ("5", "6")],
1413 expected_epa_vcpu_set_result
= True
1415 "numa-node-policy": {
1420 "num-paired-threads": "3",
1421 "paired-thread-ids": [
1438 "num-paired-threads": "7",
1439 "paired-thread-ids": [
1457 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_numa
_params
(
1458 guest_epa_quota
=guest_epa_quota
,
1461 self
.assertEqual(expected_numa_result
, numa_result
)
1462 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1464 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self
):
1465 expected_numa_result
= {}
1466 expected_epa_vcpu_set_result
= False
1467 guest_epa_quota
= {}
1469 epa_vcpu_set
= False
1471 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1472 guest_epa_quota
=guest_epa_quota
,
1473 vcpu_count
=vcpu_count
,
1474 epa_vcpu_set
=epa_vcpu_set
,
1477 self
.assertDictEqual(expected_numa_result
, numa_result
)
1478 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1480 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self
):
1481 expected_numa_result
= {}
1482 expected_epa_vcpu_set_result
= False
1484 "no-cpu-pinning-policy": "here",
1487 epa_vcpu_set
= False
1489 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1490 guest_epa_quota
=guest_epa_quota
,
1491 vcpu_count
=vcpu_count
,
1492 epa_vcpu_set
=epa_vcpu_set
,
1495 self
.assertDictEqual(expected_numa_result
, numa_result
)
1496 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1498 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self
):
1499 expected_numa_result
= {}
1500 expected_epa_vcpu_set_result
= True
1502 "cpu-pinning-policy": "DEDICATED",
1507 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1508 guest_epa_quota
=guest_epa_quota
,
1509 vcpu_count
=vcpu_count
,
1510 epa_vcpu_set
=epa_vcpu_set
,
1513 self
.assertDictEqual(expected_numa_result
, numa_result
)
1514 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1516 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self
):
1517 expected_numa_result
= {"threads": 3}
1518 expected_epa_vcpu_set_result
= True
1520 "cpu-pinning-policy": "DEDICATED",
1521 "cpu-thread-pinning-policy": "PREFER",
1524 epa_vcpu_set
= False
1526 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1527 guest_epa_quota
=guest_epa_quota
,
1528 vcpu_count
=vcpu_count
,
1529 epa_vcpu_set
=epa_vcpu_set
,
1532 self
.assertDictEqual(expected_numa_result
, numa_result
)
1533 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1535 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self
):
1536 expected_numa_result
= {"cores": 3}
1537 expected_epa_vcpu_set_result
= True
1539 "cpu-pinning-policy": "DEDICATED",
1540 "cpu-thread-pinning-policy": "ISOLATE",
1543 epa_vcpu_set
= False
1545 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1546 guest_epa_quota
=guest_epa_quota
,
1547 vcpu_count
=vcpu_count
,
1548 epa_vcpu_set
=epa_vcpu_set
,
1551 self
.assertDictEqual(expected_numa_result
, numa_result
)
1552 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1554 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self
):
1555 expected_numa_result
= {"threads": 3}
1556 expected_epa_vcpu_set_result
= True
1558 "cpu-pinning-policy": "DEDICATED",
1559 "cpu-thread-pinning-policy": "REQUIRE",
1562 epa_vcpu_set
= False
1564 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1565 guest_epa_quota
=guest_epa_quota
,
1566 vcpu_count
=vcpu_count
,
1567 epa_vcpu_set
=epa_vcpu_set
,
1570 self
.assertDictEqual(expected_numa_result
, numa_result
)
1571 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1573 def test__process_guest_epa_cpu_pinning_params(self
):
1574 expected_numa_result
= {"threads": 3}
1575 expected_epa_vcpu_set_result
= True
1577 "cpu-pinning-policy": "DEDICATED",
1580 epa_vcpu_set
= False
1582 numa_result
, epa_vcpu_set_result
= Ns
._process
_guest
_epa
_cpu
_pinning
_params
(
1583 guest_epa_quota
=guest_epa_quota
,
1584 vcpu_count
=vcpu_count
,
1585 epa_vcpu_set
=epa_vcpu_set
,
1588 self
.assertDictEqual(expected_numa_result
, numa_result
)
1589 self
.assertEqual(expected_epa_vcpu_set_result
, epa_vcpu_set_result
)
1591 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1592 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1593 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1594 def test__process_guest_epa_params_with_empty_params(
1596 guest_epa_numa_params
,
1597 guest_epa_cpu_pinning_params
,
1598 guest_epa_quota_params
,
1600 expected_result
= {}
1603 result
= Ns
._process
_epa
_params
(
1604 target_flavor
=target_flavor
,
1607 self
.assertDictEqual(expected_result
, result
)
1608 self
.assertFalse(guest_epa_numa_params
.called
)
1609 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1610 self
.assertFalse(guest_epa_quota_params
.called
)
1612 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1613 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1614 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1615 def test__process_guest_epa_params_with_wrong_params(
1617 guest_epa_numa_params
,
1618 guest_epa_cpu_pinning_params
,
1619 guest_epa_quota_params
,
1621 expected_result
= {}
1623 "no-guest-epa": "here",
1626 result
= Ns
._process
_epa
_params
(
1627 target_flavor
=target_flavor
,
1630 self
.assertDictEqual(expected_result
, result
)
1631 self
.assertFalse(guest_epa_numa_params
.called
)
1632 self
.assertFalse(guest_epa_cpu_pinning_params
.called
)
1633 self
.assertFalse(guest_epa_quota_params
.called
)
1635 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1636 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1637 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1638 def test__process_guest_epa_params(
1640 guest_epa_numa_params
,
1641 guest_epa_cpu_pinning_params
,
1642 guest_epa_quota_params
,
1645 "mem-policy": "STRICT",
1650 "numa-node-policy": {
1651 "mem-policy": "STRICT",
1656 guest_epa_numa_params
.return_value
= ({}, False)
1657 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1658 guest_epa_quota_params
.return_value
= {}
1660 result
= Ns
._process
_epa
_params
(
1661 target_flavor
=target_flavor
,
1664 self
.assertDictEqual(expected_result
, result
)
1665 self
.assertTrue(guest_epa_numa_params
.called
)
1666 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1667 self
.assertTrue(guest_epa_quota_params
.called
)
1669 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1670 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1671 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1672 def test__process_guest_epa_params_with_mempage_size(
1674 guest_epa_numa_params
,
1675 guest_epa_cpu_pinning_params
,
1676 guest_epa_quota_params
,
1679 "mempage-size": "1G",
1680 "mem-policy": "STRICT",
1685 "mempage-size": "1G",
1686 "numa-node-policy": {
1687 "mem-policy": "STRICT",
1692 guest_epa_numa_params
.return_value
= ({}, False)
1693 guest_epa_cpu_pinning_params
.return_value
= ({}, False)
1694 guest_epa_quota_params
.return_value
= {}
1696 result
= Ns
._process
_epa
_params
(
1697 target_flavor
=target_flavor
,
1700 self
.assertDictEqual(expected_result
, result
)
1701 self
.assertTrue(guest_epa_numa_params
.called
)
1702 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1703 self
.assertTrue(guest_epa_quota_params
.called
)
1705 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1706 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1707 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1708 def test__process_guest_epa_params_with_numa(
1710 guest_epa_numa_params
,
1711 guest_epa_cpu_pinning_params
,
1712 guest_epa_quota_params
,
1715 "mempage-size": "1G",
1716 "cpu-pinning-policy": "DEDICATED",
1717 "cpu-thread-pinning-policy": "PREFER",
1722 "paired-threads": 3,
1723 "paired-threads-id": [("0", "1"), ("4", "5")],
1727 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1728 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1729 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1730 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1735 "mempage-size": "1G",
1736 "cpu-pinning-policy": "DEDICATED",
1737 "cpu-thread-pinning-policy": "PREFER",
1738 "numa-node-policy": {
1743 "num-paired-threads": "3",
1744 "paired-thread-ids": [
1783 guest_epa_numa_params
.return_value
= (
1787 "paired-threads": 3,
1788 "paired-threads-id": [("0", "1"), ("4", "5")],
1795 guest_epa_cpu_pinning_params
.return_value
= (
1801 guest_epa_quota_params
.return_value
= {
1824 result
= Ns
._process
_epa
_params
(
1825 target_flavor
=target_flavor
,
1827 self
.assertEqual(expected_result
, result
)
1828 self
.assertTrue(guest_epa_numa_params
.called
)
1829 self
.assertTrue(guest_epa_cpu_pinning_params
.called
)
1830 self
.assertTrue(guest_epa_quota_params
.called
)
1832 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1833 def test__process_flavor_params_with_empty_target_flavor(
1841 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1846 target_record_id
= ""
1848 with self
.assertRaises(KeyError):
1849 Ns
._process
_flavor
_params
(
1850 target_flavor
=target_flavor
,
1853 target_record_id
=target_record_id
,
1856 self
.assertFalse(epa_params
.called
)
1858 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1859 def test__process_flavor_params_with_wrong_target_flavor(
1864 "no-target-flavor": "here",
1868 target_record_id
= ""
1870 with self
.assertRaises(KeyError):
1871 Ns
._process
_flavor
_params
(
1872 target_flavor
=target_flavor
,
1875 target_record_id
=target_record_id
,
1878 self
.assertFalse(epa_params
.called
)
1880 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1881 def test__process_flavor_params_with_empty_indata(
1905 "memory-mb": "1024",
1910 target_record_id
= ""
1912 epa_params
.return_value
= {}
1914 result
= Ns
._process
_flavor
_params
(
1915 target_flavor
=target_flavor
,
1918 target_record_id
=target_record_id
,
1921 self
.assertTrue(epa_params
.called
)
1922 self
.assertDictEqual(result
, expected_result
)
1924 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1925 def test__process_flavor_params_with_wrong_indata(
1949 "memory-mb": "1024",
1956 target_record_id
= ""
1958 epa_params
.return_value
= {}
1960 result
= Ns
._process
_flavor
_params
(
1961 target_flavor
=target_flavor
,
1964 target_record_id
=target_record_id
,
1967 self
.assertTrue(epa_params
.called
)
1968 self
.assertDictEqual(result
, expected_result
)
1970 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1971 def test__process_flavor_params_with_ephemeral_disk(
1979 db
.get_one
.return_value
= {
1980 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1985 {"id": "without_volumes-VM", "min-number-of-instances": 1}
1989 "id": "without_volumes-vnf",
1990 "product-name": "without_volumes-vnf",
1993 "id": "without_volumes-VM",
1994 "name": "without_volumes-VM",
1995 "sw-image-desc": "ubuntu20.04",
1996 "alternative-sw-image-desc": [
1998 "ubuntu20.04-azure",
2000 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2004 "virtual-storage-desc": [
2005 {"id": "root-volume", "size-of-storage": "10"},
2007 "id": "ephemeral-volume",
2008 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2009 "size-of-storage": "1",
2015 "path": "/app/storage/",
2043 "memory-mb": "1024",
2051 "ns-flavor-id": "test_id",
2052 "virtual-storages": [
2054 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2055 "size-of-storage": "10",
2060 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2065 target_record_id
= ""
2067 epa_params
.return_value
= {}
2069 result
= Ns
._process
_flavor
_params
(
2070 target_flavor
=target_flavor
,
2073 target_record_id
=target_record_id
,
2077 self
.assertTrue(epa_params
.called
)
2078 self
.assertDictEqual(result
, expected_result
)
2080 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2081 def test__process_flavor_params_with_swap_disk(
2108 "memory-mb": "1024",
2116 "ns-flavor-id": "test_id",
2117 "virtual-storages": [
2119 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2120 "size-of-storage": "20",
2125 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2130 target_record_id
= ""
2132 epa_params
.return_value
= {}
2134 result
= Ns
._process
_flavor
_params
(
2135 target_flavor
=target_flavor
,
2138 target_record_id
=target_record_id
,
2141 self
.assertTrue(epa_params
.called
)
2142 self
.assertDictEqual(result
, expected_result
)
2144 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2145 def test__process_flavor_params_with_persistent_root_disk(
2153 db
.get_one
.return_value
= {
2154 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2159 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2163 "id": "several_volumes-vnf",
2164 "product-name": "several_volumes-vnf",
2167 "id": "several_volumes-VM",
2168 "name": "several_volumes-VM",
2169 "sw-image-desc": "ubuntu20.04",
2170 "alternative-sw-image-desc": [
2172 "ubuntu20.04-azure",
2174 "virtual-storage-desc": [
2175 "persistent-root-volume",
2180 "virtual-storage-desc": [
2182 "id": "persistent-root-volume",
2183 "type-of-storage": "persistent-storage:persistent-storage",
2184 "size-of-storage": "10",
2190 "path": "/app/storage/",
2216 "memory-mb": "1024",
2224 "vdu-name": "several_volumes-VM",
2225 "ns-flavor-id": "test_id",
2226 "virtual-storages": [
2228 "type-of-storage": "persistent-storage:persistent-storage",
2229 "size-of-storage": "10",
2234 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2239 target_record_id
= ""
2241 epa_params
.return_value
= {}
2243 result
= Ns
._process
_flavor
_params
(
2244 target_flavor
=target_flavor
,
2247 target_record_id
=target_record_id
,
2251 self
.assertTrue(epa_params
.called
)
2252 self
.assertDictEqual(result
, expected_result
)
2254 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2255 def test__process_flavor_params_with_epa_params(
2266 "numa": "there-is-numa-here",
2277 "numa": "there-is-numa-here",
2286 "memory-mb": "1024",
2294 "ns-flavor-id": "test_id",
2297 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2302 target_record_id
= ""
2304 epa_params
.return_value
= {
2305 "numa": "there-is-numa-here",
2308 result
= Ns
._process
_flavor
_params
(
2309 target_flavor
=target_flavor
,
2312 target_record_id
=target_record_id
,
2315 self
.assertTrue(epa_params
.called
)
2316 self
.assertDictEqual(result
, expected_result
)
2318 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2319 def test__process_flavor_params(
2327 db
.get_one
.return_value
= {
2328 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2333 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2337 "id": "without_volumes-vnf",
2338 "product-name": "without_volumes-vnf",
2341 "id": "without_volumes-VM",
2342 "name": "without_volumes-VM",
2343 "sw-image-desc": "ubuntu20.04",
2344 "alternative-sw-image-desc": [
2346 "ubuntu20.04-azure",
2348 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2352 "virtual-storage-desc": [
2353 {"id": "root-volume", "size-of-storage": "10"},
2355 "id": "ephemeral-volume",
2356 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2357 "size-of-storage": "1",
2363 "path": "/app/storage/",
2378 "numa": "there-is-numa-here",
2391 "numa": "there-is-numa-here",
2400 "memory-mb": "1024",
2408 "ns-flavor-id": "test_id",
2409 "virtual-storages": [
2411 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2412 "size-of-storage": "10",
2415 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2416 "size-of-storage": "20",
2421 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2426 target_record_id
= ""
2428 epa_params
.return_value
= {
2429 "numa": "there-is-numa-here",
2432 result
= Ns
._process
_flavor
_params
(
2433 target_flavor
=target_flavor
,
2436 target_record_id
=target_record_id
,
2440 self
.assertTrue(epa_params
.called
)
2441 self
.assertDictEqual(result
, expected_result
)
2443 def test__process_net_params_with_empty_params(
2453 "provider_network": "some-profile-here",
2455 "some_ip_profile": "here",
2458 target_record_id
= ""
2461 "net_name": "ns-name-vld-name",
2462 "net_type": "bridge",
2464 "some_ip_profile": "here",
2466 "provider_network_profile": "some-profile-here",
2470 result
= Ns
._process
_net
_params
(
2471 target_vld
=target_vld
,
2474 target_record_id
=target_record_id
,
2477 self
.assertDictEqual(expected_result
, result
)
2479 def test__process_net_params_with_vim_info_sdn(
2490 "sdn-ports": ["some", "ports", "here"],
2491 "vlds": ["some", "vlds", "here"],
2494 target_record_id
= "vld.sdn.something"
2497 "sdn-ports": ["some", "ports", "here"],
2498 "vlds": ["some", "vlds", "here"],
2503 result
= Ns
._process
_net
_params
(
2504 target_vld
=target_vld
,
2507 target_record_id
=target_record_id
,
2510 self
.assertDictEqual(expected_result
, result
)
2512 def test__process_net_params_with_vim_info_sdn_target_vim(
2523 "sdn-ports": ["some", "ports", "here"],
2524 "vlds": ["some", "vlds", "here"],
2525 "target_vim": "some-vim",
2528 target_record_id
= "vld.sdn.something"
2530 "depends_on": ["some-vim vld.sdn"],
2532 "sdn-ports": ["some", "ports", "here"],
2533 "vlds": ["some", "vlds", "here"],
2534 "target_vim": "some-vim",
2539 result
= Ns
._process
_net
_params
(
2540 target_vld
=target_vld
,
2543 target_record_id
=target_record_id
,
2546 self
.assertDictEqual(expected_result
, result
)
2548 def test__process_net_params_with_vim_network_name(
2558 "vim_network_name": "some-network-name",
2560 target_record_id
= "vld.sdn.something"
2564 "name": "some-network-name",
2569 result
= Ns
._process
_net
_params
(
2570 target_vld
=target_vld
,
2573 target_record_id
=target_record_id
,
2576 self
.assertDictEqual(expected_result
, result
)
2578 def test__process_net_params_with_vim_network_id(
2588 "vim_network_id": "some-network-id",
2590 target_record_id
= "vld.sdn.something"
2594 "id": "some-network-id",
2599 result
= Ns
._process
_net
_params
(
2600 target_vld
=target_vld
,
2603 target_record_id
=target_record_id
,
2606 self
.assertDictEqual(expected_result
, result
)
2608 def test__process_net_params_with_mgmt_network(
2614 "mgmt-network": "some-mgmt-network",
2620 target_record_id
= "vld.sdn.something"
2628 result
= Ns
._process
_net
_params
(
2629 target_vld
=target_vld
,
2632 target_record_id
=target_record_id
,
2635 self
.assertDictEqual(expected_result
, result
)
2637 def test__process_net_params_with_underlay_eline(
2642 "underlay": "some-underlay-here",
2649 "provider_network": "some-profile-here",
2651 "some_ip_profile": "here",
2654 target_record_id
= ""
2658 "some_ip_profile": "here",
2660 "net_name": "ns-name-vld-name",
2662 "provider_network_profile": "some-profile-here",
2666 result
= Ns
._process
_net
_params
(
2667 target_vld
=target_vld
,
2670 target_record_id
=target_record_id
,
2673 self
.assertDictEqual(expected_result
, result
)
2675 def test__process_net_params_with_underlay_elan(
2680 "underlay": "some-underlay-here",
2687 "provider_network": "some-profile-here",
2689 "some_ip_profile": "here",
2692 target_record_id
= ""
2696 "some_ip_profile": "here",
2698 "net_name": "ns-name-vld-name",
2700 "provider_network_profile": "some-profile-here",
2704 result
= Ns
._process
_net
_params
(
2705 target_vld
=target_vld
,
2708 target_record_id
=target_record_id
,
2711 self
.assertDictEqual(expected_result
, result
)
2713 def test__get_cloud_init_exception(self
):
2714 db_mock
= MagicMock(name
="database mock")
2719 with self
.assertRaises(NsException
):
2720 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2722 def test__get_cloud_init_file_fs_exception(self
):
2723 db_mock
= MagicMock(name
="database mock")
2726 location
= "vnfr_id_123456:file:test_file"
2727 db_mock
.get_one
.return_value
= {
2730 "folder": "/home/osm",
2731 "pkg-dir": "vnfr_test_dir",
2736 with self
.assertRaises(NsException
):
2737 Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2739 def test__get_cloud_init_file(self
):
2740 db_mock
= MagicMock(name
="database mock")
2741 fs_mock
= MagicMock(name
="filesystem mock")
2742 file_mock
= MagicMock(name
="file mock")
2744 location
= "vnfr_id_123456:file:test_file"
2745 cloud_init_content
= "this is a cloud init file content"
2747 db_mock
.get_one
.return_value
= {
2750 "folder": "/home/osm",
2751 "pkg-dir": "vnfr_test_dir",
2755 fs_mock
.file_open
.return_value
= file_mock
2756 file_mock
.__enter
__.return_value
.read
.return_value
= cloud_init_content
2758 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2760 self
.assertEqual(cloud_init_content
, result
)
2762 def test__get_cloud_init_vdu(self
):
2763 db_mock
= MagicMock(name
="database mock")
2766 location
= "vnfr_id_123456:vdu:0"
2767 cloud_init_content
= "this is a cloud init file content"
2769 db_mock
.get_one
.return_value
= {
2772 "cloud-init": cloud_init_content
,
2777 result
= Ns
._get
_cloud
_init
(db
=db_mock
, fs
=fs_mock
, location
=location
)
2779 self
.assertEqual(cloud_init_content
, result
)
2781 @patch("jinja2.Environment.__init__")
2782 def test__parse_jinja2_undefined_error(self
, env_mock
: Mock
):
2783 cloud_init_content
= None
2787 env_mock
.side_effect
= UndefinedError("UndefinedError occurred.")
2789 with self
.assertRaises(NsException
):
2791 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2794 @patch("jinja2.Environment.__init__")
2795 def test__parse_jinja2_template_error(self
, env_mock
: Mock
):
2796 cloud_init_content
= None
2800 env_mock
.side_effect
= TemplateError("TemplateError occurred.")
2802 with self
.assertRaises(NsException
):
2804 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2807 @patch("jinja2.Environment.__init__")
2808 def test__parse_jinja2_template_not_found(self
, env_mock
: Mock
):
2809 cloud_init_content
= None
2813 env_mock
.side_effect
= TemplateNotFound("TemplateNotFound occurred.")
2815 with self
.assertRaises(NsException
):
2817 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2820 def test_rendering_jinja2_temp_without_special_characters(self
):
2821 cloud_init_content
= """
2824 table_type: {{type}}
2826 overwrite: {{is_override}}
2829 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2833 "is_override": "False",
2834 "command": "; mkdir abc",
2836 context
= "cloud-init for VM"
2837 expected_result
= """
2845 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
2847 result
= Ns
._parse
_jinja
2(
2848 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2850 self
.assertEqual(result
, expected_result
)
2852 def test_rendering_jinja2_temp_with_special_characters(self
):
2853 cloud_init_content
= """
2856 table_type: {{type}}
2858 overwrite: {{is_override}}
2861 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2865 "is_override": "False",
2866 "command": "& rm -rf",
2868 context
= "cloud-init for VM"
2869 expected_result
= """
2877 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
2879 result
= Ns
._parse
_jinja
2(
2880 cloud_init_content
=cloud_init_content
, params
=params
, context
=context
2882 self
.assertNotEqual(result
, expected_result
)
2884 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self
):
2885 with
patch("osm_ng_ro.ns.Environment") as mock_environment
:
2886 mock_environment
.return_value
= Environment(
2887 undefined
=StrictUndefined
,
2888 autoescape
=select_autoescape(default_for_string
=False, default
=False),
2890 cloud_init_content
= """
2893 table_type: {{type}}
2895 overwrite: {{is_override}}
2898 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2902 "is_override": "False",
2903 "command": "& rm -rf /",
2905 context
= "cloud-init for VM"
2906 expected_result
= """
2914 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
2916 result
= Ns
._parse
_jinja
2(
2917 cloud_init_content
=cloud_init_content
,
2921 self
.assertEqual(result
, expected_result
)
2923 @patch("osm_ng_ro.ns.Ns._assign_vim")
2924 def test__rebuild_start_stop_task(self
, assign_vim
):
2927 actions
= ["start", "stop", "rebuild"]
2928 vdu_id
= "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
2929 vnf_id
= "665b4165-ce24-4320-bf19-b9a45bade49f"
2931 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
2932 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
2934 target_vim
= "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
2935 t
= "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
2936 for action
in actions
:
2938 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
2939 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
2940 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
2941 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
2942 "status": "SCHEDULED",
2945 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0",
2946 "target_record_id": t
,
2948 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
2952 extra_dict
["params"] = {
2953 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
2956 task
= self
.ns
.rebuild_start_stop_task(
2966 self
.assertEqual(task
.get("action_id"), action_id
)
2967 self
.assertEqual(task
.get("nsr_id"), nsr_id
)
2968 self
.assertEqual(task
.get("target_id"), target_vim
)
2969 self
.assertDictEqual(task
, expected_result
)
2971 @patch("osm_ng_ro.ns.Ns._assign_vim")
2972 def test_verticalscale_task(self
, assign_vim
):
2976 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
2977 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
2979 target_record_id
= (
2980 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
2981 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
2985 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
2986 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
2987 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
2988 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
2989 "status": "SCHEDULED",
2991 "item": "verticalscale",
2992 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
2993 "target_record_id": target_record_id
,
2995 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
2996 "flavor_dict": "flavor_dict",
3000 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3002 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3005 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3006 extra_dict
["params"] = {
3007 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3008 "flavor_dict": "flavor_dict",
3010 task
= self
.ns
.verticalscale_task(
3011 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3014 self
.assertDictEqual(task
, expected_result
)
3016 @patch("osm_ng_ro.ns.Ns._assign_vim")
3017 def test_migrate_task(self
, assign_vim
):
3021 action_id
= "bb937f49-3870-4169-b758-9732e1ff40f3"
3022 nsr_id
= "993166fe-723e-4680-ac4b-b1af2541ae31"
3024 target_record_id
= (
3025 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3026 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3030 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3031 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3032 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3033 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3034 "status": "SCHEDULED",
3037 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3038 "target_record_id": target_record_id
,
3040 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3041 "migrate_host": "migrateToHost",
3045 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3047 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3050 vnf
= {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3051 extra_dict
["params"] = {
3052 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3053 "migrate_host": "migrateToHost",
3055 task
= self
.ns
.migrate_task(
3056 vdu
, vnf
, vdu_index
, action_id
, nsr_id
, task_index
, extra_dict
3059 self
.assertDictEqual(task
, expected_result
)
3062 class TestProcessVduParams(unittest
.TestCase
):
3065 self
.logger
= CopyingMock(autospec
=True)
3067 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3068 def test_find_persistent_root_volumes_empty_instantiation_vol_list(
3069 self
, mock_volume_keeping_required
3071 """Find persistent root volume, instantiation_vol_list is empty."""
3072 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3073 target_vdu
= target_vdu_wth_persistent_storage
3074 vdu_instantiation_volumes_list
= []
3076 mock_volume_keeping_required
.return_value
= True
3077 expected_root_disk
= {
3078 "id": "persistent-root-volume",
3079 "type-of-storage": "persistent-storage:persistent-storage",
3080 "size-of-storage": "10",
3081 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}],
3083 expected_persist_root_disk
= {
3084 "persistent-root-volume": {
3085 "image_id": "ubuntu20.04",
3090 expected_disk_list
= [
3092 "image_id": "ubuntu20.04",
3097 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3098 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3100 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3101 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3102 self
.assertEqual(disk_list
, expected_disk_list
)
3103 self
.assertEqual(len(disk_list
), 1)
3105 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3106 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(
3107 self
, mock_volume_keeping_required
3109 """Find persistent root volume, always selects the first vsd as root volume."""
3110 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3111 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3112 "persistent-volume2",
3113 "persistent-root-volume",
3116 target_vdu
= target_vdu_wth_persistent_storage
3117 vdu_instantiation_volumes_list
= []
3119 mock_volume_keeping_required
.return_value
= True
3120 expected_root_disk
= {
3121 "id": "persistent-volume2",
3122 "type-of-storage": "persistent-storage:persistent-storage",
3123 "size-of-storage": "10",
3125 expected_persist_root_disk
= {
3126 "persistent-volume2": {
3127 "image_id": "ubuntu20.04",
3132 expected_disk_list
= [
3134 "image_id": "ubuntu20.04",
3139 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3140 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3142 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3143 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3144 self
.assertEqual(disk_list
, expected_disk_list
)
3145 self
.assertEqual(len(disk_list
), 1)
3147 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3148 def test_find_persistent_root_volumes_empty_size_of_storage(
3149 self
, mock_volume_keeping_required
3151 """Find persistent root volume, size of storage is empty."""
3152 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3153 vnfd
["virtual-storage-desc"][0]["size-of-storage"] = ""
3154 vnfd
["vdu"][0]["virtual-storage-desc"] = [
3155 "persistent-volume2",
3156 "persistent-root-volume",
3159 target_vdu
= target_vdu_wth_persistent_storage
3160 vdu_instantiation_volumes_list
= []
3162 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3163 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3165 self
.assertEqual(persist_root_disk
, None)
3166 mock_volume_keeping_required
.assert_not_called()
3167 self
.assertEqual(disk_list
, [])
3169 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3170 def test_find_persistent_root_volumes_keeping_is_not_required(
3171 self
, mock_volume_keeping_required
3173 """Find persistent root volume, volume keeping is not required."""
3174 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3175 vnfd
["virtual-storage-desc"][1]["vdu-storage-requirements"] = [
3176 {"key": "keep-volume", "value": "false"},
3178 target_vdu
= target_vdu_wth_persistent_storage
3179 vdu_instantiation_volumes_list
= []
3181 mock_volume_keeping_required
.return_value
= False
3182 expected_root_disk
= {
3183 "id": "persistent-root-volume",
3184 "type-of-storage": "persistent-storage:persistent-storage",
3185 "size-of-storage": "10",
3186 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}],
3188 expected_persist_root_disk
= {
3189 "persistent-root-volume": {
3190 "image_id": "ubuntu20.04",
3195 expected_disk_list
= [
3197 "image_id": "ubuntu20.04",
3202 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3203 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3205 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3206 mock_volume_keeping_required
.assert_called_once_with(expected_root_disk
)
3207 self
.assertEqual(disk_list
, expected_disk_list
)
3208 self
.assertEqual(len(disk_list
), 1)
3210 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3211 def test_find_persistent_root_volumes_target_vdu_mismatch(
3212 self
, mock_volume_keeping_required
3214 """Find persistent root volume, target vdu name is not matching."""
3215 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3216 vnfd
["vdu"][0]["name"] = "Several_Volumes-VM"
3217 target_vdu
= target_vdu_wth_persistent_storage
3218 vdu_instantiation_volumes_list
= []
3220 result
= self
.ns
.find_persistent_root_volumes(
3221 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3223 self
.assertEqual(result
, None)
3224 mock_volume_keeping_required
.assert_not_called()
3225 self
.assertEqual(disk_list
, [])
3226 self
.assertEqual(len(disk_list
), 0)
3228 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3229 def test_find_persistent_root_volumes_with_instantiation_vol_list(
3230 self
, mock_volume_keeping_required
3232 """Find persistent root volume, existing volume needs to be used."""
3233 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3234 target_vdu
= target_vdu_wth_persistent_storage
3235 vdu_instantiation_volumes_list
= [
3237 "vim-volume-id": vim_volume_id
,
3238 "name": "persistent-root-volume",
3242 expected_persist_root_disk
= {
3243 "persistent-root-volume": {
3244 "vim_volume_id": vim_volume_id
,
3245 "image_id": "ubuntu20.04",
3248 expected_disk_list
= [
3250 "vim_volume_id": vim_volume_id
,
3251 "image_id": "ubuntu20.04",
3254 persist_root_disk
= self
.ns
.find_persistent_root_volumes(
3255 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3257 self
.assertEqual(persist_root_disk
, expected_persist_root_disk
)
3258 mock_volume_keeping_required
.assert_not_called()
3259 self
.assertEqual(disk_list
, expected_disk_list
)
3260 self
.assertEqual(len(disk_list
), 1)
3262 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3263 def test_find_persistent_root_volumes_invalid_instantiation_params(
3264 self
, mock_volume_keeping_required
3266 """Find persistent root volume, existing volume id keyword is invalid."""
3267 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
3268 target_vdu
= target_vdu_wth_persistent_storage
3269 vdu_instantiation_volumes_list
= [
3271 "volume-id": vim_volume_id
,
3272 "name": "persistent-root-volume",
3276 with self
.assertRaises(KeyError):
3277 self
.ns
.find_persistent_root_volumes(
3278 vnfd
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3280 mock_volume_keeping_required
.assert_not_called()
3281 self
.assertEqual(disk_list
, [])
3282 self
.assertEqual(len(disk_list
), 0)
3284 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3285 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3286 self
, mock_volume_keeping_required
3288 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3289 persistent_root_disk
= {
3290 "persistent-root-volume": {
3291 "image_id": "ubuntu20.04",
3296 mock_volume_keeping_required
.return_value
= False
3297 target_vdu
= target_vdu_wth_persistent_storage
3298 vdu_instantiation_volumes_list
= []
3301 "image_id": "ubuntu20.04",
3307 "id": "persistent-volume2",
3308 "size-of-storage": "10",
3309 "type-of-storage": "persistent-storage:persistent-storage",
3311 expected_disk_list
= [
3313 "image_id": "ubuntu20.04",
3322 self
.ns
.find_persistent_volumes(
3323 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3325 self
.assertEqual(disk_list
, expected_disk_list
)
3326 mock_volume_keeping_required
.assert_called_once_with(expected_disk
)
3328 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3329 def test_find_persistent_volumes_vdu_wth_inst_vol_list(
3330 self
, mock_volume_keeping_required
3332 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3333 persistent_root_disk
= {
3334 "persistent-root-volume": {
3335 "image_id": "ubuntu20.04",
3340 vdu_instantiation_volumes_list
= [
3342 "vim-volume-id": vim_volume_id
,
3343 "name": "persistent-volume2",
3346 target_vdu
= target_vdu_wth_persistent_storage
3349 "image_id": "ubuntu20.04",
3354 expected_disk_list
= [
3356 "image_id": "ubuntu20.04",
3361 "vim_volume_id": vim_volume_id
,
3364 self
.ns
.find_persistent_volumes(
3365 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3367 self
.assertEqual(disk_list
, expected_disk_list
)
3368 mock_volume_keeping_required
.assert_not_called()
3370 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3371 def test_find_persistent_volumes_vdu_wthout_persistent_storage(
3372 self
, mock_volume_keeping_required
3374 """Find persistent ordinary volume, there is not any persistent disk."""
3375 persistent_root_disk
= {}
3376 vdu_instantiation_volumes_list
= []
3377 mock_volume_keeping_required
.return_value
= False
3378 target_vdu
= target_vdu_wthout_persistent_storage
3380 self
.ns
.find_persistent_volumes(
3381 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3383 self
.assertEqual(disk_list
, disk_list
)
3384 mock_volume_keeping_required
.assert_not_called()
3386 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3387 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3388 self
, mock_volume_keeping_required
3390 """There is persistent root disk, but there is not ordinary persistent disk."""
3391 persistent_root_disk
= {
3392 "persistent-root-volume": {
3393 "image_id": "ubuntu20.04",
3398 vdu_instantiation_volumes_list
= []
3399 mock_volume_keeping_required
.return_value
= False
3400 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3401 target_vdu
["virtual-storages"] = [
3403 "id": "persistent-root-volume",
3404 "size-of-storage": "10",
3405 "type-of-storage": "persistent-storage:persistent-storage",
3406 "vdu-storage-requirements": [
3407 {"key": "keep-volume", "value": "true"},
3411 "id": "ephemeral-volume",
3412 "size-of-storage": "1",
3413 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3418 "image_id": "ubuntu20.04",
3423 self
.ns
.find_persistent_volumes(
3424 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3426 self
.assertEqual(disk_list
, disk_list
)
3427 mock_volume_keeping_required
.assert_not_called()
3429 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3430 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(
3431 self
, mock_volume_keeping_required
3433 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3434 vim-volume-id is given as instantiation parameter but disk id is not matching.
3436 mock_volume_keeping_required
.return_value
= True
3437 vdu_instantiation_volumes_list
= [
3439 "vim-volume-id": vim_volume_id
,
3440 "name": "persistent-volume3",
3443 persistent_root_disk
= {
3444 "persistent-root-volume": {
3445 "image_id": "ubuntu20.04",
3452 "image_id": "ubuntu20.04",
3457 expected_disk_list
= [
3459 "image_id": "ubuntu20.04",
3469 "id": "persistent-volume2",
3470 "size-of-storage": "10",
3471 "type-of-storage": "persistent-storage:persistent-storage",
3473 target_vdu
= target_vdu_wth_persistent_storage
3474 self
.ns
.find_persistent_volumes(
3475 persistent_root_disk
, target_vdu
, vdu_instantiation_volumes_list
, disk_list
3477 self
.assertEqual(disk_list
, expected_disk_list
)
3478 mock_volume_keeping_required
.assert_called_once_with(expected_disk
)
3480 def test_is_volume_keeping_required_true(self
):
3481 """Volume keeping is required."""
3482 virtual_storage_descriptor
= {
3483 "id": "persistent-root-volume",
3484 "type-of-storage": "persistent-storage:persistent-storage",
3485 "size-of-storage": "10",
3486 "vdu-storage-requirements": [
3487 {"key": "keep-volume", "value": "true"},
3490 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3491 self
.assertEqual(result
, True)
3493 def test_is_volume_keeping_required_false(self
):
3494 """Volume keeping is not required."""
3495 virtual_storage_descriptor
= {
3496 "id": "persistent-root-volume",
3497 "type-of-storage": "persistent-storage:persistent-storage",
3498 "size-of-storage": "10",
3499 "vdu-storage-requirements": [
3500 {"key": "keep-volume", "value": "false"},
3503 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3504 self
.assertEqual(result
, False)
3506 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self
):
3507 """Volume keeping is not required, vdu-storage-requirements key does not exist."""
3508 virtual_storage_descriptor
= {
3509 "id": "persistent-root-volume",
3510 "type-of-storage": "persistent-storage:persistent-storage",
3511 "size-of-storage": "10",
3513 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3514 self
.assertEqual(result
, False)
3516 def test_is_volume_keeping_required_wrong_keyword(self
):
3517 """vdu-storage-requirements key to indicate keeping-volume is wrong."""
3518 virtual_storage_descriptor
= {
3519 "id": "persistent-root-volume",
3520 "type-of-storage": "persistent-storage:persistent-storage",
3521 "size-of-storage": "10",
3522 "vdu-storage-requirements": [
3523 {"key": "hold-volume", "value": "true"},
3526 result
= self
.ns
.is_volume_keeping_required(virtual_storage_descriptor
)
3527 self
.assertEqual(result
, False)
3529 def test_sort_vdu_interfaces_position_all_wth_positions(self
):
3530 """Interfaces are sorted according to position, all have positions."""
3531 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3532 target_vdu
["interfaces"] = [
3535 "ns-vld-id": "datanet",
3540 "ns-vld-id": "mgmtnet",
3544 sorted_interfaces
= [
3547 "ns-vld-id": "mgmtnet",
3552 "ns-vld-id": "datanet",
3556 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3557 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3559 def test_sort_vdu_interfaces_position_some_wth_position(self
):
3560 """Interfaces are sorted according to position, some of them have positions."""
3561 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3562 target_vdu
["interfaces"] = [
3565 "ns-vld-id": "mgmtnet",
3569 "ns-vld-id": "datanet",
3573 sorted_interfaces
= [
3576 "ns-vld-id": "datanet",
3581 "ns-vld-id": "mgmtnet",
3584 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3585 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3587 def test_sort_vdu_interfaces_position_empty_interface_list(self
):
3588 """Interface list is empty."""
3589 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3590 target_vdu
["interfaces"] = []
3591 sorted_interfaces
= []
3592 self
.ns
._sort
_vdu
_interfaces
(target_vdu
)
3593 self
.assertEqual(target_vdu
["interfaces"], sorted_interfaces
)
3595 def test_partially_locate_vdu_interfaces(self
):
3596 """Some interfaces have positions."""
3597 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3598 target_vdu
["interfaces"] = [
3601 "ns-vld-id": "net1",
3603 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3606 "ns-vld-id": "mgmtnet",
3610 "ns-vld-id": "datanet",
3614 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3615 self
.assertDictEqual(
3616 target_vdu
["interfaces"][0],
3619 "ns-vld-id": "datanet",
3623 self
.assertDictEqual(
3624 target_vdu
["interfaces"][2],
3625 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3628 def test_partially_locate_vdu_interfaces_position_start_from_0(self
):
3629 """Some interfaces have positions, position start from 0."""
3630 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3631 target_vdu
["interfaces"] = [
3634 "ns-vld-id": "net1",
3636 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3639 "ns-vld-id": "mgmtnet",
3643 "ns-vld-id": "datanet",
3647 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3648 self
.assertDictEqual(
3649 target_vdu
["interfaces"][0],
3652 "ns-vld-id": "datanet",
3656 self
.assertDictEqual(
3657 target_vdu
["interfaces"][3],
3658 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3661 def test_partially_locate_vdu_interfaces_wthout_position(self
):
3662 """Interfaces do not have positions."""
3663 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3664 target_vdu
["interfaces"] = interfaces_wthout_positions
3665 expected_result
= deepcopy(target_vdu
["interfaces"])
3666 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3667 self
.assertEqual(target_vdu
["interfaces"], expected_result
)
3669 def test_partially_locate_vdu_interfaces_all_has_position(self
):
3670 """All interfaces have position."""
3671 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3672 target_vdu
["interfaces"] = interfaces_wth_all_positions
3673 expected_interfaces
= [
3676 "ns-vld-id": "net2",
3681 "ns-vld-id": "mgmtnet",
3686 "ns-vld-id": "net1",
3690 self
.ns
._partially
_locate
_vdu
_interfaces
(target_vdu
)
3691 self
.assertEqual(target_vdu
["interfaces"], expected_interfaces
)
3693 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3694 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3695 def test_prepare_vdu_cloud_init(self
, mock_parse_jinja2
, mock_get_cloud_init
):
3696 """Target_vdu has cloud-init and boot-data-drive."""
3697 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3698 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3699 target_vdu
["boot-data-drive"] = "vda"
3701 mock_get_cloud_init
.return_value
= cloud_init_content
3702 mock_parse_jinja2
.return_value
= user_data
3704 "user-data": user_data
,
3705 "boot-data-drive": "vda",
3707 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3708 self
.assertDictEqual(result
, expected_result
)
3709 mock_get_cloud_init
.assert_called_once_with(
3710 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3712 mock_parse_jinja2
.assert_called_once_with(
3713 cloud_init_content
=cloud_init_content
,
3715 context
="sample-cloud-init-path",
3718 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3719 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3720 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3721 self
, mock_parse_jinja2
, mock_get_cloud_init
3723 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3724 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3725 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3726 target_vdu
["boot-data-drive"] = "vda"
3728 mock_get_cloud_init
.side_effect
= NsException(
3729 "Mismatch descriptor for cloud init."
3732 with self
.assertRaises(NsException
) as err
:
3733 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3734 self
.assertEqual(str(err
.exception
), "Mismatch descriptor for cloud init.")
3736 mock_get_cloud_init
.assert_called_once_with(
3737 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3739 mock_parse_jinja2
.assert_not_called()
3741 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3742 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3743 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3744 self
, mock_parse_jinja2
, mock_get_cloud_init
3746 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3747 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3748 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3749 target_vdu
["boot-data-drive"] = "vda"
3751 mock_get_cloud_init
.return_value
= cloud_init_content
3752 mock_parse_jinja2
.side_effect
= NsException("Error parsing cloud-init content.")
3754 with self
.assertRaises(NsException
) as err
:
3755 self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3756 self
.assertEqual(str(err
.exception
), "Error parsing cloud-init content.")
3757 mock_get_cloud_init
.assert_called_once_with(
3758 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3760 mock_parse_jinja2
.assert_called_once_with(
3761 cloud_init_content
=cloud_init_content
,
3763 context
="sample-cloud-init-path",
3766 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3767 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3768 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
3769 self
, mock_parse_jinja2
, mock_get_cloud_init
3771 """Target_vdu has cloud-init but do not have boot-data-drive."""
3772 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3773 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3775 mock_get_cloud_init
.return_value
= cloud_init_content
3776 mock_parse_jinja2
.return_value
= user_data
3778 "user-data": user_data
,
3780 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3781 self
.assertDictEqual(result
, expected_result
)
3782 mock_get_cloud_init
.assert_called_once_with(
3783 db
=db
, fs
=fs
, location
="sample-cloud-init-path"
3785 mock_parse_jinja2
.assert_called_once_with(
3786 cloud_init_content
=cloud_init_content
,
3788 context
="sample-cloud-init-path",
3791 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3792 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3793 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
3794 self
, mock_parse_jinja2
, mock_get_cloud_init
3796 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
3797 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3798 target_vdu
["cloud-init"] = "sample-cloud-init-path"
3799 target_vdu
["boot-data-drive"] = "vda"
3800 vdu2cloud_init
= {"sample-cloud-init-path": cloud_init_content
}
3801 mock_parse_jinja2
.return_value
= user_data
3803 "user-data": user_data
,
3804 "boot-data-drive": "vda",
3806 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3807 self
.assertDictEqual(result
, expected_result
)
3808 mock_get_cloud_init
.assert_not_called()
3809 mock_parse_jinja2
.assert_called_once_with(
3810 cloud_init_content
=cloud_init_content
,
3812 context
="sample-cloud-init-path",
3815 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3816 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3817 def test_prepare_vdu_cloud_init_no_cloud_init(
3818 self
, mock_parse_jinja2
, mock_get_cloud_init
3820 """Target_vdu do not have cloud-init."""
3821 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
3822 target_vdu
["boot-data-drive"] = "vda"
3825 "boot-data-drive": "vda",
3827 result
= self
.ns
._prepare
_vdu
_cloud
_init
(target_vdu
, vdu2cloud_init
, db
, fs
)
3828 self
.assertDictEqual(result
, expected_result
)
3829 mock_get_cloud_init
.assert_not_called()
3830 mock_parse_jinja2
.assert_not_called()
3832 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self
):
3833 """ns_vld and vnf_vld both exist."""
3836 "ns-vld-id": "mgmtnet",
3837 "vnf-vld-id": "mgmt_cp_int",
3839 expected_result
= f
"{ns_preffix}:vld.mgmtnet"
3840 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3841 interface
, ns_preffix
, vnf_preffix
3843 self
.assertEqual(result
, expected_result
)
3845 def test_check_vld_information_of_interfaces_empty_interfaces(self
):
3846 """Interface dict is empty."""
3848 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3849 interface
, ns_preffix
, vnf_preffix
3851 self
.assertEqual(result
, "")
3853 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self
):
3854 """Interface dict has only vnf_vld."""
3857 "vnf-vld-id": "mgmt_cp_int",
3859 expected_result
= f
"{vnf_preffix}:vld.mgmt_cp_int"
3860 result
= self
.ns
._check
_vld
_information
_of
_interfaces
(
3861 interface
, ns_preffix
, vnf_preffix
3863 self
.assertEqual(result
, expected_result
)
3865 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
3868 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
3871 "vnf-vld-id": "mgmt_cp_int",
3874 with self
.assertRaises(Exception) as err
:
3875 self
.ns
._check
_vld
_information
_of
_interfaces
(
3876 interface
, ns_preffix
, vnf_preffix
3878 self
.assertEqual(type(err
), TypeError)
3880 def test_prepare_interface_port_security_has_security_details(self
):
3881 """Interface dict has port security details."""
3884 "ns-vld-id": "mgmtnet",
3885 "vnf-vld-id": "mgmt_cp_int",
3886 "port-security-enabled": True,
3887 "port-security-disable-strategy": "allow-address-pairs",
3889 expected_interface
= {
3891 "ns-vld-id": "mgmtnet",
3892 "vnf-vld-id": "mgmt_cp_int",
3893 "port_security": True,
3894 "port_security_disable_strategy": "allow-address-pairs",
3896 self
.ns
._prepare
_interface
_port
_security
(interface
)
3897 self
.assertDictEqual(interface
, expected_interface
)
3899 def test_prepare_interface_port_security_empty_interfaces(self
):
3900 """Interface dict is empty."""
3902 expected_interface
= {}
3903 self
.ns
._prepare
_interface
_port
_security
(interface
)
3904 self
.assertDictEqual(interface
, expected_interface
)
3906 def test_prepare_interface_port_security_wthout_port_security(self
):
3907 """Interface dict does not have port security details."""
3910 "ns-vld-id": "mgmtnet",
3911 "vnf-vld-id": "mgmt_cp_int",
3913 expected_interface
= {
3915 "ns-vld-id": "mgmtnet",
3916 "vnf-vld-id": "mgmt_cp_int",
3918 self
.ns
._prepare
_interface
_port
_security
(interface
)
3919 self
.assertDictEqual(interface
, expected_interface
)
3921 def test_create_net_item_of_interface_floating_ip_port_security(self
):
3922 """Interface dict has floating ip, port-security details."""
3925 "vcpi": "sample_vcpi",
3926 "port_security": True,
3927 "port_security_disable_strategy": "allow-address-pairs",
3928 "floating_ip": "10.1.1.12",
3929 "ns-vld-id": "mgmtnet",
3930 "vnf-vld-id": "mgmt_cp_int",
3932 net_text
= f
"{ns_preffix}"
3933 expected_net_item
= {
3935 "port_security": True,
3936 "port_security_disable_strategy": "allow-address-pairs",
3937 "floating_ip": "10.1.1.12",
3938 "net_id": f
"TASK-{ns_preffix}",
3941 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
3942 self
.assertDictEqual(result
, expected_net_item
)
3944 def test_create_net_item_of_interface_invalid_net_text(self
):
3945 """net-text is invalid."""
3948 "vcpi": "sample_vcpi",
3949 "port_security": True,
3950 "port_security_disable_strategy": "allow-address-pairs",
3951 "floating_ip": "10.1.1.12",
3952 "ns-vld-id": "mgmtnet",
3953 "vnf-vld-id": "mgmt_cp_int",
3956 with self
.assertRaises(TypeError):
3957 self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
3959 def test_create_net_item_of_interface_empty_interface(self
):
3960 """Interface dict is empty."""
3962 net_text
= ns_preffix
3963 expected_net_item
= {
3964 "net_id": f
"TASK-{ns_preffix}",
3967 result
= self
.ns
._create
_net
_item
_of
_interface
(interface
, net_text
)
3968 self
.assertDictEqual(result
, expected_net_item
)
3970 @patch("osm_ng_ro.ns.deep_get")
3971 def test_prepare_type_of_interface_type_sriov(self
, mock_deep_get
):
3972 """Interface type is SR-IOV."""
3975 "vcpi": "sample_vcpi",
3976 "port_security": True,
3977 "port_security_disable_strategy": "allow-address-pairs",
3978 "floating_ip": "10.1.1.12",
3979 "ns-vld-id": "mgmtnet",
3980 "vnf-vld-id": "mgmt_cp_int",
3983 mock_deep_get
.return_value
= "SR-IOV"
3984 net_text
= ns_preffix
3986 expected_net_item
= {
3991 self
.ns
._prepare
_type
_of
_interface
(
3992 interface
, tasks_by_target_record_id
, net_text
, net_item
3994 self
.assertDictEqual(net_item
, expected_net_item
)
3997 tasks_by_target_record_id
[net_text
]["extra_dict"]["params"]["net_type"],
3999 mock_deep_get
.assert_called_once_with(
4000 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4003 @patch("osm_ng_ro.ns.deep_get")
4004 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
4007 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
4010 "vcpi": "sample_vcpi",
4011 "port_security": True,
4012 "port_security_disable_strategy": "allow-address-pairs",
4013 "floating_ip": "10.1.1.12",
4014 "ns-vld-id": "mgmtnet",
4015 "vnf-vld-id": "mgmt_cp_int",
4016 "type": "PCI-PASSTHROUGH",
4018 mock_deep_get
.return_value
= {}
4019 tasks_by_target_record_id
= {}
4020 net_text
= ns_preffix
4022 expected_net_item
= {
4024 "model": "PCI-PASSTHROUGH",
4025 "type": "PCI-PASSTHROUGH",
4027 self
.ns
._prepare
_type
_of
_interface
(
4028 interface
, tasks_by_target_record_id
, net_text
, net_item
4030 self
.assertDictEqual(net_item
, expected_net_item
)
4031 mock_deep_get
.assert_called_once_with(
4032 tasks_by_target_record_id
, net_text
, "extra_dict", "params", "net_type"
4035 @patch("osm_ng_ro.ns.deep_get")
4036 def test_prepare_type_of_interface_type_mgmt(self
, mock_deep_get
):
4037 """Interface type is mgmt."""
4040 "vcpi": "sample_vcpi",
4041 "port_security": True,
4042 "port_security_disable_strategy": "allow-address-pairs",
4043 "floating_ip": "10.1.1.12",
4044 "ns-vld-id": "mgmtnet",
4045 "vnf-vld-id": "mgmt_cp_int",
4048 tasks_by_target_record_id
= {}
4049 net_text
= ns_preffix
4051 expected_net_item
= {
4054 self
.ns
._prepare
_type
_of
_interface
(
4055 interface
, tasks_by_target_record_id
, net_text
, net_item
4057 self
.assertDictEqual(net_item
, expected_net_item
)
4058 mock_deep_get
.assert_not_called()
4060 @patch("osm_ng_ro.ns.deep_get")
4061 def test_prepare_type_of_interface_type_bridge(self
, mock_deep_get
):
4062 """Interface type is bridge."""
4065 "vcpi": "sample_vcpi",
4066 "port_security": True,
4067 "port_security_disable_strategy": "allow-address-pairs",
4068 "floating_ip": "10.1.1.12",
4069 "ns-vld-id": "mgmtnet",
4070 "vnf-vld-id": "mgmt_cp_int",
4072 tasks_by_target_record_id
= {}
4073 net_text
= ns_preffix
4075 expected_net_item
= {
4079 self
.ns
._prepare
_type
_of
_interface
(
4080 interface
, tasks_by_target_record_id
, net_text
, net_item
4082 self
.assertDictEqual(net_item
, expected_net_item
)
4083 mock_deep_get
.assert_not_called()
4085 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4086 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4087 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4088 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4089 def test_prepare_vdu_interfaces(
4091 mock_type_of_interface
,
4092 mock_item_of_interface
,
4094 mock_vld_information_of_interface
,
4096 """Prepare vdu interfaces successfully."""
4097 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4100 "ns-vld-id": "net1",
4101 "ip-address": "13.2.12.31",
4102 "mgmt-interface": True,
4106 "vnf-vld-id": "net2",
4107 "mac-address": "d0:94:66:ed:fc:e2",
4111 "ns-vld-id": "mgmtnet",
4113 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4115 "params": "test_params",
4116 "find_params": "test_find_params",
4120 net_text_1
= f
"{ns_preffix}:net1"
4121 net_text_2
= f
"{vnf_preffix}:net2"
4122 net_text_3
= f
"{ns_preffix}:mgmtnet"
4125 "net_id": f
"TASK-{ns_preffix}",
4130 "net_id": f
"TASK-{ns_preffix}",
4135 "net_id": f
"TASK-{ns_preffix}",
4138 mock_item_of_interface
.side_effect
= [net_item_1
, net_item_2
, net_item_3
]
4139 mock_vld_information_of_interface
.side_effect
= [
4145 expected_extra_dict
= {
4146 "params": "test_params",
4147 "find_params": "test_find_params",
4148 "depends_on": [net_text_1
, net_text_2
, net_text_3
],
4149 "mgmt_vdu_interface": 0,
4151 updated_net_item1
= deepcopy(net_item_1
)
4152 updated_net_item1
.update({"ip_address": "13.2.12.31"})
4153 updated_net_item2
= deepcopy(net_item_2
)
4154 updated_net_item2
.update({"mac_address": "d0:94:66:ed:fc:e2"})
4155 expected_net_list
= [updated_net_item1
, updated_net_item2
, net_item_3
]
4156 self
.ns
._prepare
_vdu
_interfaces
(
4162 tasks_by_target_record_id
,
4165 _call_mock_vld_information_of_interface
= (
4166 mock_vld_information_of_interface
.call_args_list
4169 _call_mock_vld_information_of_interface
[0][0],
4170 (interface_1
, ns_preffix
, vnf_preffix
),
4173 _call_mock_vld_information_of_interface
[1][0],
4174 (interface_2
, ns_preffix
, vnf_preffix
),
4177 _call_mock_vld_information_of_interface
[2][0],
4178 (interface_3
, ns_preffix
, vnf_preffix
),
4181 _call_mock_port_security
= mock_port_security
.call_args_list
4182 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4183 self
.assertEqual(_call_mock_port_security
[1].args
[0], interface_2
)
4184 self
.assertEqual(_call_mock_port_security
[2].args
[0], interface_3
)
4186 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4187 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4188 self
.assertEqual(_call_mock_item_of_interface
[1][0], (interface_2
, net_text_2
))
4189 self
.assertEqual(_call_mock_item_of_interface
[2][0], (interface_3
, net_text_3
))
4191 _call_mock_type_of_interface
= mock_type_of_interface
.call_args_list
4193 _call_mock_type_of_interface
[0][0],
4194 (interface_1
, tasks_by_target_record_id
, net_text_1
, net_item_1
),
4197 _call_mock_type_of_interface
[1][0],
4198 (interface_2
, tasks_by_target_record_id
, net_text_2
, net_item_2
),
4201 _call_mock_type_of_interface
[2][0],
4202 (interface_3
, tasks_by_target_record_id
, net_text_3
, net_item_3
),
4204 self
.assertEqual(net_list
, expected_net_list
)
4205 self
.assertEqual(extra_dict
, expected_extra_dict
)
4206 self
.logger
.error
.assert_not_called()
4208 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4209 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4210 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4211 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4212 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4214 mock_type_of_interface
,
4215 mock_item_of_interface
,
4217 mock_vld_information_of_interface
,
4219 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4220 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4223 "ns-vld-id": "net1",
4224 "ip-address": "13.2.12.31",
4225 "mgmt-interface": True,
4229 "vnf-vld-id": "net2",
4230 "mac-address": "d0:94:66:ed:fc:e2",
4234 "ns-vld-id": "mgmtnet",
4236 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4238 "params": "test_params",
4239 "find_params": "test_find_params",
4242 net_text_1
= f
"{ns_preffix}:net1"
4243 mock_item_of_interface
.side_effect
= [TypeError, TypeError, TypeError]
4245 mock_vld_information_of_interface
.side_effect
= [net_text_1
]
4247 expected_extra_dict
= {
4248 "params": "test_params",
4249 "find_params": "test_find_params",
4250 "depends_on": [net_text_1
],
4252 with self
.assertRaises(TypeError):
4253 self
.ns
._prepare
_vdu
_interfaces
(
4259 tasks_by_target_record_id
,
4263 _call_mock_vld_information_of_interface
= (
4264 mock_vld_information_of_interface
.call_args_list
4267 _call_mock_vld_information_of_interface
[0][0],
4268 (interface_1
, ns_preffix
, vnf_preffix
),
4271 _call_mock_port_security
= mock_port_security
.call_args_list
4272 self
.assertEqual(_call_mock_port_security
[0].args
[0], interface_1
)
4274 _call_mock_item_of_interface
= mock_item_of_interface
.call_args_list
4275 self
.assertEqual(_call_mock_item_of_interface
[0][0], (interface_1
, net_text_1
))
4277 mock_type_of_interface
.assert_not_called()
4278 self
.logger
.error
.assert_not_called()
4279 self
.assertEqual(net_list
, [])
4280 self
.assertEqual(extra_dict
, expected_extra_dict
)
4282 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4283 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4284 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4285 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4286 def test_prepare_vdu_interfaces_vld_information_is_empty(
4288 mock_type_of_interface
,
4289 mock_item_of_interface
,
4291 mock_vld_information_of_interface
,
4293 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4294 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4297 "ns-vld-id": "net1",
4298 "ip-address": "13.2.12.31",
4299 "mgmt-interface": True,
4303 "vnf-vld-id": "net2",
4304 "mac-address": "d0:94:66:ed:fc:e2",
4308 "ns-vld-id": "mgmtnet",
4310 target_vdu
["interfaces"] = [interface_1
, interface_2
, interface_3
]
4312 "params": "test_params",
4313 "find_params": "test_find_params",
4316 mock_vld_information_of_interface
.side_effect
= ["", "", ""]
4318 self
.ns
._prepare
_vdu
_interfaces
(
4324 tasks_by_target_record_id
,
4328 _call_mock_vld_information_of_interface
= (
4329 mock_vld_information_of_interface
.call_args_list
4332 _call_mock_vld_information_of_interface
[0][0],
4333 (interface_1
, ns_preffix
, vnf_preffix
),
4336 _call_mock_vld_information_of_interface
[1][0],
4337 (interface_2
, ns_preffix
, vnf_preffix
),
4340 _call_mock_vld_information_of_interface
[2][0],
4341 (interface_3
, ns_preffix
, vnf_preffix
),
4344 _call_logger
= self
.logger
.error
.call_args_list
4347 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4351 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4355 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4357 self
.assertEqual(net_list
, [])
4361 "params": "test_params",
4362 "find_params": "test_find_params",
4367 mock_item_of_interface
.assert_not_called()
4368 mock_port_security
.assert_not_called()
4369 mock_type_of_interface
.assert_not_called()
4371 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4372 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4373 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4374 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4375 def test_prepare_vdu_interfaces_empty_interface_list(
4377 mock_type_of_interface
,
4378 mock_item_of_interface
,
4380 mock_vld_information_of_interface
,
4382 """Prepare vdu interfaces, interface list is empty."""
4383 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4384 target_vdu
["interfaces"] = []
4387 self
.ns
._prepare
_vdu
_interfaces
(
4393 tasks_by_target_record_id
,
4396 mock_type_of_interface
.assert_not_called()
4397 mock_vld_information_of_interface
.assert_not_called()
4398 mock_item_of_interface
.assert_not_called()
4399 mock_port_security
.assert_not_called()
4401 def test_prepare_vdu_ssh_keys(self
):
4402 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4403 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4404 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4405 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4406 target_vdu
["ssh-access-required"] = True
4408 expected_cloud_config
= {
4409 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4411 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4412 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4414 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self
):
4415 """Target_vdu does not have ssh-keys."""
4416 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4417 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4418 target_vdu
["ssh-access-required"] = True
4420 expected_cloud_config
= {"key-pairs": [{"public_key": "path_of_public_key"}]}
4421 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4422 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4424 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self
):
4425 """Target_vdu has ssh-keys, ssh-access is not required."""
4426 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4427 target_vdu
["ssh-keys"] = ["sample-ssh-key"]
4428 ro_nsr_public_key
= {"public_key": "path_of_public_key"}
4429 target_vdu
["ssh-access-required"] = False
4431 expected_cloud_config
= {"key-pairs": ["sample-ssh-key"]}
4432 self
.ns
._prepare
_vdu
_ssh
_keys
(target_vdu
, ro_nsr_public_key
, cloud_config
)
4433 self
.assertDictEqual(cloud_config
, expected_cloud_config
)
4435 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4436 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4437 def test_add_persistent_root_disk_to_disk_list_keep_false(
4438 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4440 """Add persistent root disk to disk_list, keep volume set to False."""
4442 "id": "persistent-root-volume",
4443 "type-of-storage": "persistent-storage:persistent-storage",
4444 "size-of-storage": "10",
4446 mock_select_persistent_root_disk
.return_value
= root_disk
4447 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4448 vnfd
["virtual-storage-desc"][1] = root_disk
4449 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4450 persistent_root_disk
= {}
4452 mock_volume_keeping_required
.return_value
= False
4453 expected_disk_list
= [
4455 "image_id": "ubuntu20.04",
4460 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4461 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4463 self
.assertEqual(disk_list
, expected_disk_list
)
4464 mock_select_persistent_root_disk
.assert_called_once()
4465 mock_volume_keeping_required
.assert_called_once()
4467 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4468 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4469 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4470 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4472 """Add persistent root disk to disk_list"""
4474 "id": "persistent-root-volume",
4475 "type-of-storage": "persistent-storage:persistent-storage",
4476 "size-of-storage": "10",
4478 mock_select_persistent_root_disk
.side_effect
= AttributeError
4479 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4480 vnfd
["virtual-storage-desc"][1] = root_disk
4481 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4482 persistent_root_disk
= {}
4484 with self
.assertRaises(AttributeError):
4485 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4486 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4488 self
.assertEqual(disk_list
, [])
4489 mock_select_persistent_root_disk
.assert_called_once()
4490 mock_volume_keeping_required
.assert_not_called()
4492 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4493 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4494 def test_add_persistent_root_disk_to_disk_list_keep_true(
4495 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4497 """Add persistent root disk, keeo volume set to True."""
4498 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4499 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4500 mock_volume_keeping_required
.return_value
= True
4502 "id": "persistent-root-volume",
4503 "type-of-storage": "persistent-storage:persistent-storage",
4504 "size-of-storage": "10",
4505 "vdu-storage-requirements": [
4506 {"key": "keep-volume", "value": "true"},
4509 mock_select_persistent_root_disk
.return_value
= root_disk
4510 persistent_root_disk
= {}
4512 expected_disk_list
= [
4514 "image_id": "ubuntu20.04",
4519 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4520 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4522 self
.assertEqual(disk_list
, expected_disk_list
)
4523 mock_volume_keeping_required
.assert_called_once_with(root_disk
)
4525 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4526 def test_add_persistent_ordinary_disk_to_disk_list(
4527 self
, mock_volume_keeping_required
4529 """Add persistent ordinary disk, keeo volume set to True."""
4530 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4531 mock_volume_keeping_required
.return_value
= False
4532 persistent_root_disk
= {
4533 "persistent-root-volume": {
4534 "image_id": "ubuntu20.04",
4540 "id": "persistent-volume2",
4541 "type-of-storage": "persistent-storage:persistent-storage",
4542 "size-of-storage": "10",
4544 persistent_ordinary_disk
= {}
4546 expected_disk_list
= [
4552 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4553 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4555 self
.assertEqual(disk_list
, expected_disk_list
)
4556 mock_volume_keeping_required
.assert_called_once_with(ordinary_disk
)
4558 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4559 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(
4560 self
, mock_volume_keeping_required
4562 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4563 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4564 mock_volume_keeping_required
.return_value
= False
4565 persistent_root_disk
= {
4566 "persistent-root-volume": {
4567 "image_id": "ubuntu20.04",
4571 "persistent-volume2": {
4575 persistent_ordinary_disk
= {}
4578 self
.ns
._add
_persistent
_ordinary
_disks
_to
_disk
_list
(
4579 target_vdu
, persistent_root_disk
, persistent_ordinary_disk
, disk_list
4581 self
.assertEqual(disk_list
, [])
4582 mock_volume_keeping_required
.assert_not_called()
4584 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4585 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4586 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4587 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4589 """VNFD does not have persistent storage."""
4590 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4591 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4592 mock_select_persistent_root_disk
.return_value
= None
4593 persistent_root_disk
= {}
4595 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4596 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4598 self
.assertEqual(disk_list
, [])
4599 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4600 mock_volume_keeping_required
.assert_not_called()
4602 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4603 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4604 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4605 self
, mock_volume_keeping_required
, mock_select_persistent_root_disk
4607 """Persistent_root_disk dict is empty."""
4608 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
4609 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4610 mock_select_persistent_root_disk
.return_value
= None
4611 persistent_root_disk
= {}
4613 self
.ns
._add
_persistent
_root
_disk
_to
_disk
_list
(
4614 vnfd
, target_vdu
, persistent_root_disk
, disk_list
4616 self
.assertEqual(disk_list
, [])
4617 self
.assertEqual(mock_select_persistent_root_disk
.call_count
, 2)
4618 mock_volume_keeping_required
.assert_not_called()
4620 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self
):
4621 """Invalid extra dict."""
4622 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4623 target_vdu
["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4625 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4626 with self
.assertRaises(NsException
) as err
:
4627 self
.ns
._prepare
_vdu
_affinity
_group
_list
(target_vdu
, extra_dict
, ns_preffix
)
4628 self
.assertEqual(str(err
.exception
), "Invalid extra_dict format.")
4630 def test_prepare_vdu_affinity_group_list_one_affinity_group(self
):
4631 """There is one affinity-group."""
4632 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4633 target_vdu
["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4634 extra_dict
= {"depends_on": []}
4635 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4636 affinity_group_txt
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4637 expected_result
= [{"affinity_group_id": "TASK-" + affinity_group_txt
}]
4638 expected_extra_dict
= {"depends_on": [affinity_group_txt
]}
4639 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4640 target_vdu
, extra_dict
, ns_preffix
4642 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4643 self
.assertEqual(result
, expected_result
)
4645 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self
):
4646 """There are two affinity-groups."""
4647 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4648 target_vdu
["affinity-or-anti-affinity-group-id"] = [
4649 "affinity-group-id1",
4650 "affinity-group-id2",
4652 extra_dict
= {"depends_on": []}
4653 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4654 affinity_group_txt1
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4655 affinity_group_txt2
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4657 {"affinity_group_id": "TASK-" + affinity_group_txt1
},
4658 {"affinity_group_id": "TASK-" + affinity_group_txt2
},
4660 expected_extra_dict
= {"depends_on": [affinity_group_txt1
, affinity_group_txt2
]}
4661 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4662 target_vdu
, extra_dict
, ns_preffix
4664 self
.assertDictEqual(extra_dict
, expected_extra_dict
)
4665 self
.assertEqual(result
, expected_result
)
4667 def test_prepare_vdu_affinity_group_list_no_affinity_group(self
):
4668 """There is not any affinity-group."""
4669 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4670 extra_dict
= {"depends_on": []}
4671 ns_preffix
= "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4672 result
= self
.ns
._prepare
_vdu
_affinity
_group
_list
(
4673 target_vdu
, extra_dict
, ns_preffix
4675 self
.assertDictEqual(extra_dict
, {"depends_on": []})
4676 self
.assertEqual(result
, [])
4678 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4679 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4680 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4681 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4682 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4683 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4684 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4685 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4686 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4687 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4688 def test_process_vdu_params_with_inst_vol_list(
4690 mock_prepare_vdu_affinity_group_list
,
4691 mock_add_persistent_ordinary_disks_to_disk_list
,
4692 mock_add_persistent_root_disk_to_disk_list
,
4693 mock_find_persistent_volumes
,
4694 mock_find_persistent_root_volumes
,
4695 mock_prepare_vdu_ssh_keys
,
4696 mock_prepare_vdu_cloud_init
,
4697 mock_prepare_vdu_interfaces
,
4698 mock_locate_vdu_interfaces
,
4699 mock_sort_vdu_interfaces
,
4701 """Instantiation volume list is empty."""
4702 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
4704 target_vdu
["interfaces"] = interfaces_wth_all_positions
4706 vdu_instantiation_vol_list
= [
4708 "vim-volume-id": vim_volume_id
,
4709 "name": "persistent-volume2",
4712 target_vdu
["additionalParams"] = {
4713 "OSM": {"vdu_volumes": vdu_instantiation_vol_list
}
4715 mock_prepare_vdu_cloud_init
.return_value
= {}
4716 mock_prepare_vdu_affinity_group_list
.return_value
= []
4717 persistent_root_disk
= {
4718 "persistent-root-volume": {
4719 "image_id": "ubuntu20.04",
4723 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
4725 new_kwargs
= deepcopy(kwargs
)
4730 "tasks_by_target_record_id": {},
4734 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
4735 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4736 db
.get_one
.return_value
= vnfd
4737 result
= Ns
._process
_vdu
_params
(
4738 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4740 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4741 mock_locate_vdu_interfaces
.assert_not_called()
4742 mock_prepare_vdu_cloud_init
.assert_called_once()
4743 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
4744 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
4745 mock_prepare_vdu_interfaces
.assert_called_once_with(
4747 expected_extra_dict_copy
,
4754 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4755 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4756 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4757 mock_find_persistent_volumes
.assert_called_once_with(
4758 persistent_root_disk
, target_vdu
, vdu_instantiation_vol_list
, []
4761 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4762 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4763 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4764 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4765 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4766 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4767 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4768 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4769 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4770 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4771 def test_process_vdu_params_with_inst_flavor_id(
4773 mock_prepare_vdu_affinity_group_list
,
4774 mock_add_persistent_ordinary_disks_to_disk_list
,
4775 mock_add_persistent_root_disk_to_disk_list
,
4776 mock_find_persistent_volumes
,
4777 mock_find_persistent_root_volumes
,
4778 mock_prepare_vdu_ssh_keys
,
4779 mock_prepare_vdu_cloud_init
,
4780 mock_prepare_vdu_interfaces
,
4781 mock_locate_vdu_interfaces
,
4782 mock_sort_vdu_interfaces
,
4784 """Instantiation volume list is empty."""
4785 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4787 target_vdu
["interfaces"] = interfaces_wth_all_positions
4789 vdu_instantiation_flavor_id
= "flavor_test"
4791 target_vdu
["additionalParams"] = {
4792 "OSM": {"vim_flavor_id": vdu_instantiation_flavor_id
}
4794 mock_prepare_vdu_cloud_init
.return_value
= {}
4795 mock_prepare_vdu_affinity_group_list
.return_value
= []
4797 new_kwargs
= deepcopy(kwargs
)
4802 "tasks_by_target_record_id": {},
4806 expected_extra_dict_copy
= deepcopy(expected_extra_dict3
)
4807 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4808 db
.get_one
.return_value
= vnfd
4809 result
= Ns
._process
_vdu
_params
(
4810 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4812 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4813 mock_locate_vdu_interfaces
.assert_not_called()
4814 mock_prepare_vdu_cloud_init
.assert_called_once()
4815 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4816 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4817 mock_prepare_vdu_interfaces
.assert_called_once_with(
4819 expected_extra_dict_copy
,
4826 self
.assertDictEqual(result
, expected_extra_dict_copy
)
4827 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4828 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4829 mock_find_persistent_volumes
.assert_not_called()
4831 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4832 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4833 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4834 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4835 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4836 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4837 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4838 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4839 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4840 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4841 def test_process_vdu_params_wth_affinity_groups(
4843 mock_prepare_vdu_affinity_group_list
,
4844 mock_add_persistent_ordinary_disks_to_disk_list
,
4845 mock_add_persistent_root_disk_to_disk_list
,
4846 mock_find_persistent_volumes
,
4847 mock_find_persistent_root_volumes
,
4848 mock_prepare_vdu_ssh_keys
,
4849 mock_prepare_vdu_cloud_init
,
4850 mock_prepare_vdu_interfaces
,
4851 mock_locate_vdu_interfaces
,
4852 mock_sort_vdu_interfaces
,
4854 """There is cloud-config."""
4855 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4858 target_vdu
["interfaces"] = interfaces_wth_all_positions
4859 mock_prepare_vdu_cloud_init
.return_value
= {}
4860 mock_prepare_vdu_affinity_group_list
.return_value
= [
4865 new_kwargs
= deepcopy(kwargs
)
4870 "tasks_by_target_record_id": {},
4874 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
4875 expected_extra_dict3
["params"]["affinity_group_list"] = [
4879 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4880 db
.get_one
.return_value
= vnfd
4881 result
= Ns
._process
_vdu
_params
(
4882 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4884 self
.assertDictEqual(result
, expected_extra_dict3
)
4885 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4886 mock_locate_vdu_interfaces
.assert_not_called()
4887 mock_prepare_vdu_cloud_init
.assert_called_once()
4888 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4889 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4890 mock_prepare_vdu_interfaces
.assert_called_once_with(
4892 expected_extra_dict3
,
4900 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
4901 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4902 mock_find_persistent_volumes
.assert_not_called()
4904 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4905 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4906 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4907 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4908 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4909 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4910 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4911 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4912 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4913 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4914 def test_process_vdu_params_wth_cloud_config(
4916 mock_prepare_vdu_affinity_group_list
,
4917 mock_add_persistent_ordinary_disks_to_disk_list
,
4918 mock_add_persistent_root_disk_to_disk_list
,
4919 mock_find_persistent_volumes
,
4920 mock_find_persistent_root_volumes
,
4921 mock_prepare_vdu_ssh_keys
,
4922 mock_prepare_vdu_cloud_init
,
4923 mock_prepare_vdu_interfaces
,
4924 mock_locate_vdu_interfaces
,
4925 mock_sort_vdu_interfaces
,
4927 """There is cloud-config."""
4928 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
4931 target_vdu
["interfaces"] = interfaces_wth_all_positions
4932 mock_prepare_vdu_cloud_init
.return_value
= {
4933 "user-data": user_data
,
4934 "boot-data-drive": "vda",
4936 mock_prepare_vdu_affinity_group_list
.return_value
= []
4938 new_kwargs
= deepcopy(kwargs
)
4943 "tasks_by_target_record_id": {},
4947 expected_extra_dict3
= deepcopy(expected_extra_dict2
)
4948 expected_extra_dict3
["params"]["cloud_config"] = {
4949 "user-data": user_data
,
4950 "boot-data-drive": "vda",
4952 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
4953 db
.get_one
.return_value
= vnfd
4954 result
= Ns
._process
_vdu
_params
(
4955 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
4957 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
4958 mock_locate_vdu_interfaces
.assert_not_called()
4959 mock_prepare_vdu_cloud_init
.assert_called_once()
4960 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
4961 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
4962 mock_prepare_vdu_interfaces
.assert_called_once_with(
4964 expected_extra_dict3
,
4971 self
.assertDictEqual(result
, expected_extra_dict3
)
4972 mock_prepare_vdu_ssh_keys
.assert_called_once_with(
4973 target_vdu
, None, {"user-data": user_data
, "boot-data-drive": "vda"}
4975 mock_prepare_vdu_affinity_group_list
.assert_called_once()
4976 mock_find_persistent_volumes
.assert_not_called()
4978 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4979 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4980 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4981 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4982 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4983 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4984 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4985 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4986 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4987 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4988 def test_process_vdu_params_wthout_persistent_storage(
4990 mock_prepare_vdu_affinity_group_list
,
4991 mock_add_persistent_ordinary_disks_to_disk_list
,
4992 mock_add_persistent_root_disk_to_disk_list
,
4993 mock_find_persistent_volumes
,
4994 mock_find_persistent_root_volumes
,
4995 mock_prepare_vdu_ssh_keys
,
4996 mock_prepare_vdu_cloud_init
,
4997 mock_prepare_vdu_interfaces
,
4998 mock_locate_vdu_interfaces
,
4999 mock_sort_vdu_interfaces
,
5001 """There is not any persistent storage."""
5002 target_vdu
= deepcopy(target_vdu_wthout_persistent_storage
)
5005 target_vdu
["interfaces"] = interfaces_wth_all_positions
5006 mock_prepare_vdu_cloud_init
.return_value
= {}
5007 mock_prepare_vdu_affinity_group_list
.return_value
= []
5009 new_kwargs
= deepcopy(kwargs
)
5014 "tasks_by_target_record_id": {},
5018 expected_extra_dict_copy
= deepcopy(expected_extra_dict2
)
5019 vnfd
= deepcopy(vnfd_wthout_persistent_storage
)
5020 db
.get_one
.return_value
= vnfd
5021 result
= Ns
._process
_vdu
_params
(
5022 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5024 mock_sort_vdu_interfaces
.assert_called_once_with(target_vdu
)
5025 mock_locate_vdu_interfaces
.assert_not_called()
5026 mock_prepare_vdu_cloud_init
.assert_called_once()
5027 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5028 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5029 mock_prepare_vdu_interfaces
.assert_called_once_with(
5031 expected_extra_dict_copy
,
5038 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5039 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5040 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5041 mock_find_persistent_volumes
.assert_not_called()
5043 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5044 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5045 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5046 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5047 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5048 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5049 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5050 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5051 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5052 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5053 def test_process_vdu_params_interfaces_partially_located(
5055 mock_prepare_vdu_affinity_group_list
,
5056 mock_add_persistent_ordinary_disks_to_disk_list
,
5057 mock_add_persistent_root_disk_to_disk_list
,
5058 mock_find_persistent_volumes
,
5059 mock_find_persistent_root_volumes
,
5060 mock_prepare_vdu_ssh_keys
,
5061 mock_prepare_vdu_cloud_init
,
5062 mock_prepare_vdu_interfaces
,
5063 mock_locate_vdu_interfaces
,
5064 mock_sort_vdu_interfaces
,
5066 """Some interfaces have position."""
5067 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5070 target_vdu
["interfaces"] = [
5073 "ns-vld-id": "net1",
5075 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
5078 "ns-vld-id": "mgmtnet",
5081 mock_prepare_vdu_cloud_init
.return_value
= {}
5082 mock_prepare_vdu_affinity_group_list
.return_value
= []
5083 persistent_root_disk
= {
5084 "persistent-root-volume": {
5085 "image_id": "ubuntu20.04",
5090 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5092 new_kwargs
= deepcopy(kwargs
)
5097 "tasks_by_target_record_id": {},
5102 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5103 db
.get_one
.return_value
= vnfd
5104 result
= Ns
._process
_vdu
_params
(
5105 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5107 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5108 mock_sort_vdu_interfaces
.assert_not_called()
5109 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5110 mock_prepare_vdu_cloud_init
.assert_called_once()
5111 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5112 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5113 mock_prepare_vdu_interfaces
.assert_called_once_with(
5115 expected_extra_dict_copy
,
5122 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5123 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5124 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5125 mock_find_persistent_volumes
.assert_not_called()
5126 mock_find_persistent_root_volumes
.assert_not_called()
5128 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5129 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5130 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5131 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5132 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5133 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5134 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5135 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5136 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5137 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5138 def test_process_vdu_params_no_interface_position(
5140 mock_prepare_vdu_affinity_group_list
,
5141 mock_add_persistent_ordinary_disks_to_disk_list
,
5142 mock_add_persistent_root_disk_to_disk_list
,
5143 mock_find_persistent_volumes
,
5144 mock_find_persistent_root_volumes
,
5145 mock_prepare_vdu_ssh_keys
,
5146 mock_prepare_vdu_cloud_init
,
5147 mock_prepare_vdu_interfaces
,
5148 mock_locate_vdu_interfaces
,
5149 mock_sort_vdu_interfaces
,
5151 """Interfaces do not have position."""
5152 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5155 target_vdu
["interfaces"] = interfaces_wthout_positions
5156 mock_prepare_vdu_cloud_init
.return_value
= {}
5157 mock_prepare_vdu_affinity_group_list
.return_value
= []
5158 persistent_root_disk
= {
5159 "persistent-root-volume": {
5160 "image_id": "ubuntu20.04",
5165 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5166 new_kwargs
= deepcopy(kwargs
)
5171 "tasks_by_target_record_id": {},
5176 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5177 db
.get_one
.return_value
= vnfd
5178 result
= Ns
._process
_vdu
_params
(
5179 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5181 expected_extra_dict_copy
= deepcopy(expected_extra_dict
)
5182 mock_sort_vdu_interfaces
.assert_not_called()
5183 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5184 mock_prepare_vdu_cloud_init
.assert_called_once()
5185 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5186 mock_add_persistent_ordinary_disks_to_disk_list
.assert_called_once()
5187 mock_prepare_vdu_interfaces
.assert_called_once_with(
5189 expected_extra_dict_copy
,
5196 self
.assertDictEqual(result
, expected_extra_dict_copy
)
5197 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5198 mock_prepare_vdu_affinity_group_list
.assert_called_once()
5199 mock_find_persistent_volumes
.assert_not_called()
5200 mock_find_persistent_root_volumes
.assert_not_called()
5202 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5203 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5204 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5205 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5206 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5207 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5208 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5209 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5210 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5211 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5212 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5214 mock_prepare_vdu_affinity_group_list
,
5215 mock_add_persistent_ordinary_disks_to_disk_list
,
5216 mock_add_persistent_root_disk_to_disk_list
,
5217 mock_find_persistent_volumes
,
5218 mock_find_persistent_root_volumes
,
5219 mock_prepare_vdu_ssh_keys
,
5220 mock_prepare_vdu_cloud_init
,
5221 mock_prepare_vdu_interfaces
,
5222 mock_locate_vdu_interfaces
,
5223 mock_sort_vdu_interfaces
,
5225 """Prepare vdu interfaces method raises exception."""
5226 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5229 target_vdu
["interfaces"] = interfaces_wthout_positions
5230 mock_prepare_vdu_cloud_init
.return_value
= {}
5231 mock_prepare_vdu_affinity_group_list
.return_value
= []
5232 persistent_root_disk
= {
5233 "persistent-root-volume": {
5234 "image_id": "ubuntu20.04",
5239 mock_find_persistent_root_volumes
.return_value
= persistent_root_disk
5240 new_kwargs
= deepcopy(kwargs
)
5245 "tasks_by_target_record_id": {},
5249 mock_prepare_vdu_interfaces
.side_effect
= TypeError
5251 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5252 db
.get_one
.return_value
= vnfd
5253 with self
.assertRaises(Exception) as err
:
5254 Ns
._process
_vdu
_params
(
5255 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5257 self
.assertEqual(type(err
), TypeError)
5258 mock_sort_vdu_interfaces
.assert_not_called()
5259 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5260 mock_prepare_vdu_cloud_init
.assert_not_called()
5261 mock_add_persistent_root_disk_to_disk_list
.assert_not_called()
5262 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5263 mock_prepare_vdu_interfaces
.assert_called_once()
5264 mock_prepare_vdu_ssh_keys
.assert_not_called()
5265 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5266 mock_find_persistent_volumes
.assert_not_called()
5267 mock_find_persistent_root_volumes
.assert_not_called()
5269 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5270 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5271 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5272 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5273 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5274 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5275 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5276 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5277 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5278 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5279 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5281 mock_prepare_vdu_affinity_group_list
,
5282 mock_add_persistent_ordinary_disks_to_disk_list
,
5283 mock_add_persistent_root_disk_to_disk_list
,
5284 mock_find_persistent_volumes
,
5285 mock_find_persistent_root_volumes
,
5286 mock_prepare_vdu_ssh_keys
,
5287 mock_prepare_vdu_cloud_init
,
5288 mock_prepare_vdu_interfaces
,
5289 mock_locate_vdu_interfaces
,
5290 mock_sort_vdu_interfaces
,
5292 """Add persistent root disk method raises exception."""
5293 target_vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5296 target_vdu
["interfaces"] = interfaces_wthout_positions
5297 mock_prepare_vdu_cloud_init
.return_value
= {}
5298 mock_prepare_vdu_affinity_group_list
.return_value
= []
5299 mock_add_persistent_root_disk_to_disk_list
.side_effect
= KeyError
5300 new_kwargs
= deepcopy(kwargs
)
5305 "tasks_by_target_record_id": {},
5310 vnfd
= deepcopy(vnfd_wth_persistent_storage
)
5311 db
.get_one
.return_value
= vnfd
5312 with self
.assertRaises(Exception) as err
:
5313 Ns
._process
_vdu
_params
(
5314 target_vdu
, indata
, vim_info
=None, target_record_id
=None, **new_kwargs
5316 self
.assertEqual(type(err
), KeyError)
5317 mock_sort_vdu_interfaces
.assert_not_called()
5318 mock_locate_vdu_interfaces
.assert_called_once_with(target_vdu
)
5319 mock_prepare_vdu_cloud_init
.assert_called_once()
5320 mock_add_persistent_root_disk_to_disk_list
.assert_called_once()
5321 mock_add_persistent_ordinary_disks_to_disk_list
.assert_not_called()
5322 mock_prepare_vdu_interfaces
.assert_called_once_with(
5326 f
"{ns_preffix}:image.0",
5327 f
"{ns_preffix}:flavor.0",
5337 mock_prepare_vdu_ssh_keys
.assert_called_once_with(target_vdu
, None, {})
5338 mock_prepare_vdu_affinity_group_list
.assert_not_called()
5339 mock_find_persistent_volumes
.assert_not_called()
5340 mock_find_persistent_root_volumes
.assert_not_called()
5342 def test_select_persistent_root_disk(self
):
5343 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5344 vdu
["virtual-storage-desc"] = [
5345 "persistent-root-volume",
5346 "persistent-volume2",
5349 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5350 expected_result
= vsd
5351 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5352 self
.assertEqual(result
, expected_result
)
5354 def test_select_persistent_root_disk_first_vsd_is_different(self
):
5355 """VDU first virtual-storage-desc is different than vsd id."""
5356 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5357 vdu
["virtual-storage-desc"] = [
5358 "persistent-volume2",
5359 "persistent-root-volume",
5362 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5363 expected_result
= None
5364 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5365 self
.assertEqual(result
, expected_result
)
5367 def test_select_persistent_root_disk_vsd_is_not_persistent(self
):
5368 """vsd type is not persistent."""
5369 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5370 vdu
["virtual-storage-desc"] = [
5371 "persistent-volume2",
5372 "persistent-root-volume",
5375 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5376 vsd
["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5377 expected_result
= None
5378 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5379 self
.assertEqual(result
, expected_result
)
5381 def test_select_persistent_root_disk_vsd_does_not_have_size(self
):
5382 """vsd size is None."""
5383 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5384 vdu
["virtual-storage-desc"] = [
5385 "persistent-volume2",
5386 "persistent-root-volume",
5389 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5390 vsd
["size-of-storage"] = None
5391 expected_result
= None
5392 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5393 self
.assertEqual(result
, expected_result
)
5395 def test_select_persistent_root_disk_vdu_wthout_vsd(self
):
5396 """VDU does not have virtual-storage-desc."""
5397 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5398 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"][1]
5399 expected_result
= None
5400 result
= Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)
5401 self
.assertEqual(result
, expected_result
)
5403 def test_select_persistent_root_disk_invalid_vsd_type(self
):
5404 """vsd is list, expected to be a dict."""
5405 vdu
= deepcopy(target_vdu_wth_persistent_storage
)
5406 vsd
= deepcopy(vnfd_wth_persistent_storage
)["virtual-storage-desc"]
5407 with self
.assertRaises(AttributeError):
5408 Ns
._select
_persistent
_root
_disk
(vsd
, vdu
)