d96d6a2389fc53785c841dc71ed6896d49fcf0db
[osm/RO.git] / NG-RO / osm_ng_ro / tests / test_ns.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy import deepcopy
18 import unittest
19 from unittest.mock import MagicMock, Mock, patch
20
21 from jinja2 import (
22 Environment,
23 select_autoescape,
24 StrictUndefined,
25 TemplateError,
26 TemplateNotFound,
27 UndefinedError,
28 )
29 from osm_ng_ro.ns import Ns, NsException
30
31
32 __author__ = "Eduardo Sousa"
33 __date__ = "$19-NOV-2021 00:00:00$"
34
35
36 # Variables used in Tests
37 vnfd_wth_persistent_storage = {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
39 "df": [
40 {
41 "id": "default-df",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
43 }
44 ],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
47 "vdu": [
48 {
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
53 "ubuntu20.04-aws",
54 "ubuntu20.04-azure",
55 ],
56 "virtual-storage-desc": [
57 "persistent-root-volume",
58 "persistent-volume2",
59 "ephemeral-volume",
60 ],
61 }
62 ],
63 "version": "1.0",
64 "virtual-storage-desc": [
65 {
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
69 },
70 {
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
74 "vdu-storage-requirements": [
75 {"key": "keep-volume", "value": "true"},
76 ],
77 },
78 {
79 "id": "ephemeral-volume",
80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
81 "size-of-storage": "1",
82 },
83 ],
84 "_admin": {
85 "storage": {
86 "fs": "mongo",
87 "path": "/app/storage/",
88 },
89 "type": "vnfd",
90 },
91 }
92 vim_volume_id = "ru937f49-3870-4169-b758-9732e1ff40f3"
93 task_by_target_record_id = {
94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
95 "extra_dict": {"params": {"net_type": "SR-IOV"}}
96 }
97 }
98 interfaces_wthout_positions = [
99 {
100 "name": "vdu-eth1",
101 "ns-vld-id": "net1",
102 },
103 {
104 "name": "vdu-eth2",
105 "ns-vld-id": "net2",
106 },
107 {
108 "name": "vdu-eth3",
109 "ns-vld-id": "mgmtnet",
110 },
111 ]
112 interfaces_wth_all_positions = [
113 {
114 "name": "vdu-eth1",
115 "ns-vld-id": "net1",
116 "position": 2,
117 },
118 {
119 "name": "vdu-eth2",
120 "ns-vld-id": "net2",
121 "position": 0,
122 },
123 {
124 "name": "vdu-eth3",
125 "ns-vld-id": "mgmtnet",
126 "position": 1,
127 },
128 ]
129 target_vdu_wth_persistent_storage = {
130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
131 "ns-flavor-id": "0",
132 "ns-image-id": "0",
133 "vdu-name": "several_volumes-VM",
134 "interfaces": [
135 {
136 "name": "vdu-eth0",
137 "ns-vld-id": "mgmtnet",
138 }
139 ],
140 "virtual-storages": [
141 {
142 "id": "persistent-volume2",
143 "size-of-storage": "10",
144 "type-of-storage": "persistent-storage:persistent-storage",
145 },
146 {
147 "id": "persistent-root-volume",
148 "size-of-storage": "10",
149 "type-of-storage": "persistent-storage:persistent-storage",
150 "vdu-storage-requirements": [
151 {"key": "keep-volume", "value": "true"},
152 ],
153 },
154 {
155 "id": "ephemeral-volume",
156 "size-of-storage": "1",
157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
158 },
159 ],
160 }
161 db = MagicMock(name="database mock")
162 fs = MagicMock(name="database mock")
163 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
164 vnf_preffix = "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
165 vnfr_id = "wh47f48-y870-4169-b758-5732e1ff40f5"
166 nsr_id = "th47f48-9870-4169-b758-9732e1ff40f3"
167 indata = {
168 "name": "sample_name",
169 }
170 expected_extra_dict = {
171 "depends_on": [
172 f"{ns_preffix}:image.0",
173 f"{ns_preffix}:flavor.0",
174 ],
175 "params": {
176 "affinity_group_list": [],
177 "availability_zone_index": None,
178 "availability_zone_list": None,
179 "cloud_config": None,
180 "description": "several_volumes-VM",
181 "disk_list": [],
182 "flavor_id": f"TASK-{ns_preffix}:flavor.0",
183 "image_id": f"TASK-{ns_preffix}:image.0",
184 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
185 "net_list": [],
186 "start": True,
187 },
188 }
189
190 expected_extra_dict2 = {
191 "depends_on": [
192 f"{ns_preffix}:image.0",
193 f"{ns_preffix}:flavor.0",
194 ],
195 "params": {
196 "affinity_group_list": [],
197 "availability_zone_index": None,
198 "availability_zone_list": None,
199 "cloud_config": None,
200 "description": "without_volumes-VM",
201 "disk_list": [],
202 "flavor_id": f"TASK-{ns_preffix}:flavor.0",
203 "image_id": f"TASK-{ns_preffix}:image.0",
204 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
205 "net_list": [],
206 "start": True,
207 },
208 }
209 tasks_by_target_record_id = {
210 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
211 "extra_dict": {
212 "params": {
213 "net_type": "SR-IOV",
214 }
215 }
216 }
217 }
218 kwargs = {
219 "db": MagicMock(),
220 "vdu2cloud_init": {},
221 "vnfr": {
222 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
223 "member-vnf-index-ref": "vnf-several-volumes",
224 },
225 }
226 vnfd_wthout_persistent_storage = {
227 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
228 "df": [
229 {
230 "id": "default-df",
231 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
232 }
233 ],
234 "id": "without_volumes-vnf",
235 "product-name": "without_volumes-vnf",
236 "vdu": [
237 {
238 "id": "without_volumes-VM",
239 "name": "without_volumes-VM",
240 "sw-image-desc": "ubuntu20.04",
241 "alternative-sw-image-desc": [
242 "ubuntu20.04-aws",
243 "ubuntu20.04-azure",
244 ],
245 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
246 }
247 ],
248 "version": "1.0",
249 "virtual-storage-desc": [
250 {"id": "root-volume", "size-of-storage": "10"},
251 {
252 "id": "ephemeral-volume",
253 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
254 "size-of-storage": "1",
255 },
256 ],
257 "_admin": {
258 "storage": {
259 "fs": "mongo",
260 "path": "/app/storage/",
261 },
262 "type": "vnfd",
263 },
264 }
265
266 target_vdu_wthout_persistent_storage = {
267 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
268 "ns-flavor-id": "0",
269 "ns-image-id": "0",
270 "vdu-name": "without_volumes-VM",
271 "interfaces": [
272 {
273 "name": "vdu-eth0",
274 "ns-vld-id": "mgmtnet",
275 }
276 ],
277 "virtual-storages": [
278 {
279 "id": "root-volume",
280 "size-of-storage": "10",
281 },
282 {
283 "id": "ephemeral-volume",
284 "size-of-storage": "1",
285 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
286 },
287 ],
288 }
289 cloud_init_content = """
290 disk_setup:
291 ephemeral0:
292 table_type: {{type}}
293 layout: True
294 overwrite: {{is_override}}
295 runcmd:
296 - [ ls, -l, / ]
297 - [ sh, -xc, "echo $(date) '{{command}}'" ]
298 """
299
300 user_data = """
301 disk_setup:
302 ephemeral0:
303 table_type: mbr
304 layout: True
305 overwrite: False
306 runcmd:
307 - [ ls, -l, / ]
308 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
309 """
310
311
312 class CopyingMock(MagicMock):
313 def __call__(self, *args, **kwargs):
314 args = deepcopy(args)
315 kwargs = deepcopy(kwargs)
316 return super(CopyingMock, self).__call__(*args, **kwargs)
317
318
319 class TestNs(unittest.TestCase):
320 def setUp(self):
321 pass
322
323 def test__create_task_without_extra_dict(self):
324 expected_result = {
325 "target_id": "vim_openstack_1",
326 "action_id": "123456",
327 "nsr_id": "654321",
328 "task_id": "123456:1",
329 "status": "SCHEDULED",
330 "action": "CREATE",
331 "item": "test_item",
332 "target_record": "test_target_record",
333 "target_record_id": "test_target_record_id",
334 }
335 deployment_info = {
336 "action_id": "123456",
337 "nsr_id": "654321",
338 "task_index": 1,
339 }
340
341 task = Ns._create_task(
342 deployment_info=deployment_info,
343 target_id="vim_openstack_1",
344 item="test_item",
345 action="CREATE",
346 target_record="test_target_record",
347 target_record_id="test_target_record_id",
348 )
349
350 self.assertEqual(deployment_info.get("task_index"), 2)
351 self.assertDictEqual(task, expected_result)
352
353 def test__create_task(self):
354 expected_result = {
355 "target_id": "vim_openstack_1",
356 "action_id": "123456",
357 "nsr_id": "654321",
358 "task_id": "123456:1",
359 "status": "SCHEDULED",
360 "action": "CREATE",
361 "item": "test_item",
362 "target_record": "test_target_record",
363 "target_record_id": "test_target_record_id",
364 # values coming from extra_dict
365 "params": "test_params",
366 "find_params": "test_find_params",
367 "depends_on": "test_depends_on",
368 }
369 deployment_info = {
370 "action_id": "123456",
371 "nsr_id": "654321",
372 "task_index": 1,
373 }
374
375 task = Ns._create_task(
376 deployment_info=deployment_info,
377 target_id="vim_openstack_1",
378 item="test_item",
379 action="CREATE",
380 target_record="test_target_record",
381 target_record_id="test_target_record_id",
382 extra_dict={
383 "params": "test_params",
384 "find_params": "test_find_params",
385 "depends_on": "test_depends_on",
386 },
387 )
388
389 self.assertEqual(deployment_info.get("task_index"), 2)
390 self.assertDictEqual(task, expected_result)
391
392 @patch("osm_ng_ro.ns.time")
393 def test__create_ro_task(self, mock_time: Mock):
394 now = 1637324838.994551
395 mock_time.return_value = now
396 task = {
397 "target_id": "vim_openstack_1",
398 "action_id": "123456",
399 "nsr_id": "654321",
400 "task_id": "123456:1",
401 "status": "SCHEDULED",
402 "action": "CREATE",
403 "item": "test_item",
404 "target_record": "test_target_record",
405 "target_record_id": "test_target_record_id",
406 # values coming from extra_dict
407 "params": "test_params",
408 "find_params": "test_find_params",
409 "depends_on": "test_depends_on",
410 }
411 expected_result = {
412 "_id": "123456:1",
413 "locked_by": None,
414 "locked_at": 0.0,
415 "target_id": "vim_openstack_1",
416 "vim_info": {
417 "created": False,
418 "created_items": None,
419 "vim_id": None,
420 "vim_name": None,
421 "vim_status": None,
422 "vim_details": None,
423 "vim_message": None,
424 "refresh_at": None,
425 },
426 "modified_at": now,
427 "created_at": now,
428 "to_check_at": now,
429 "tasks": [task],
430 }
431
432 ro_task = Ns._create_ro_task(
433 target_id="vim_openstack_1",
434 task=task,
435 )
436
437 self.assertDictEqual(ro_task, expected_result)
438
439 def test__process_image_params_with_empty_target_image(self):
440 expected_result = {
441 "find_params": {},
442 }
443 target_image = {}
444
445 result = Ns._process_image_params(
446 target_image=target_image,
447 indata=None,
448 vim_info=None,
449 target_record_id=None,
450 )
451
452 self.assertDictEqual(expected_result, result)
453
454 def test__process_image_params_with_wrong_target_image(self):
455 expected_result = {
456 "find_params": {},
457 }
458 target_image = {
459 "no_image": "to_see_here",
460 }
461
462 result = Ns._process_image_params(
463 target_image=target_image,
464 indata=None,
465 vim_info=None,
466 target_record_id=None,
467 )
468
469 self.assertDictEqual(expected_result, result)
470
471 def test__process_image_params_with_image(self):
472 expected_result = {
473 "find_params": {
474 "filter_dict": {
475 "name": "cirros",
476 },
477 },
478 }
479 target_image = {
480 "image": "cirros",
481 }
482
483 result = Ns._process_image_params(
484 target_image=target_image,
485 indata=None,
486 vim_info=None,
487 target_record_id=None,
488 )
489
490 self.assertDictEqual(expected_result, result)
491
492 def test__process_image_params_with_vim_image_id(self):
493 expected_result = {
494 "find_params": {
495 "filter_dict": {
496 "id": "123456",
497 },
498 },
499 }
500 target_image = {
501 "vim_image_id": "123456",
502 }
503
504 result = Ns._process_image_params(
505 target_image=target_image,
506 indata=None,
507 vim_info=None,
508 target_record_id=None,
509 )
510
511 self.assertDictEqual(expected_result, result)
512
513 def test__process_image_params_with_image_checksum(self):
514 expected_result = {
515 "find_params": {
516 "filter_dict": {
517 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
518 },
519 },
520 }
521 target_image = {
522 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
523 }
524
525 result = Ns._process_image_params(
526 target_image=target_image,
527 indata=None,
528 vim_info=None,
529 target_record_id=None,
530 )
531
532 self.assertDictEqual(expected_result, result)
533
534 def test__get_resource_allocation_params_with_empty_target_image(self):
535 expected_result = {}
536 quota_descriptor = {}
537
538 result = Ns._get_resource_allocation_params(
539 quota_descriptor=quota_descriptor,
540 )
541
542 self.assertDictEqual(expected_result, result)
543
544 def test__get_resource_allocation_params_with_wrong_target_image(self):
545 expected_result = {}
546 quota_descriptor = {
547 "no_quota": "present_here",
548 }
549
550 result = Ns._get_resource_allocation_params(
551 quota_descriptor=quota_descriptor,
552 )
553
554 self.assertDictEqual(expected_result, result)
555
556 def test__get_resource_allocation_params_with_limit(self):
557 expected_result = {
558 "limit": 10,
559 }
560 quota_descriptor = {
561 "limit": "10",
562 }
563
564 result = Ns._get_resource_allocation_params(
565 quota_descriptor=quota_descriptor,
566 )
567
568 self.assertDictEqual(expected_result, result)
569
570 def test__get_resource_allocation_params_with_reserve(self):
571 expected_result = {
572 "reserve": 20,
573 }
574 quota_descriptor = {
575 "reserve": "20",
576 }
577
578 result = Ns._get_resource_allocation_params(
579 quota_descriptor=quota_descriptor,
580 )
581
582 self.assertDictEqual(expected_result, result)
583
584 def test__get_resource_allocation_params_with_shares(self):
585 expected_result = {
586 "shares": 30,
587 }
588 quota_descriptor = {
589 "shares": "30",
590 }
591
592 result = Ns._get_resource_allocation_params(
593 quota_descriptor=quota_descriptor,
594 )
595
596 self.assertDictEqual(expected_result, result)
597
598 def test__get_resource_allocation_params(self):
599 expected_result = {
600 "limit": 10,
601 "reserve": 20,
602 "shares": 30,
603 }
604 quota_descriptor = {
605 "limit": "10",
606 "reserve": "20",
607 "shares": "30",
608 }
609
610 result = Ns._get_resource_allocation_params(
611 quota_descriptor=quota_descriptor,
612 )
613
614 self.assertDictEqual(expected_result, result)
615
616 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
617 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
618 self,
619 resource_allocation,
620 ):
621 expected_result = {}
622 guest_epa_quota = {}
623 epa_vcpu_set = True
624
625 result = Ns._process_guest_epa_quota_params(
626 guest_epa_quota=guest_epa_quota,
627 epa_vcpu_set=epa_vcpu_set,
628 )
629
630 self.assertDictEqual(expected_result, result)
631 self.assertFalse(resource_allocation.called)
632
633 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
634 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
635 self,
636 resource_allocation,
637 ):
638 expected_result = {}
639 guest_epa_quota = {}
640 epa_vcpu_set = False
641
642 result = Ns._process_guest_epa_quota_params(
643 guest_epa_quota=guest_epa_quota,
644 epa_vcpu_set=epa_vcpu_set,
645 )
646
647 self.assertDictEqual(expected_result, result)
648 self.assertFalse(resource_allocation.called)
649
650 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
651 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
652 self,
653 resource_allocation,
654 ):
655 expected_result = {}
656 guest_epa_quota = {
657 "no-quota": "nothing",
658 }
659 epa_vcpu_set = True
660
661 result = Ns._process_guest_epa_quota_params(
662 guest_epa_quota=guest_epa_quota,
663 epa_vcpu_set=epa_vcpu_set,
664 )
665
666 self.assertDictEqual(expected_result, result)
667 self.assertFalse(resource_allocation.called)
668
669 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
670 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
671 self,
672 resource_allocation,
673 ):
674 expected_result = {}
675 guest_epa_quota = {
676 "no-quota": "nothing",
677 }
678 epa_vcpu_set = False
679
680 result = Ns._process_guest_epa_quota_params(
681 guest_epa_quota=guest_epa_quota,
682 epa_vcpu_set=epa_vcpu_set,
683 )
684
685 self.assertDictEqual(expected_result, result)
686 self.assertFalse(resource_allocation.called)
687
688 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
689 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
690 self,
691 resource_allocation,
692 ):
693 expected_result = {}
694 guest_epa_quota = {
695 "cpu-quota": {
696 "limit": "10",
697 "reserve": "20",
698 "shares": "30",
699 },
700 }
701 epa_vcpu_set = True
702
703 result = Ns._process_guest_epa_quota_params(
704 guest_epa_quota=guest_epa_quota,
705 epa_vcpu_set=epa_vcpu_set,
706 )
707
708 self.assertDictEqual(expected_result, result)
709 self.assertFalse(resource_allocation.called)
710
711 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
712 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
713 self,
714 resource_allocation,
715 ):
716 expected_result = {
717 "cpu-quota": {
718 "limit": 10,
719 "reserve": 20,
720 "shares": 30,
721 },
722 }
723 guest_epa_quota = {
724 "cpu-quota": {
725 "limit": "10",
726 "reserve": "20",
727 "shares": "30",
728 },
729 }
730 epa_vcpu_set = False
731
732 resource_allocation_param = {
733 "limit": "10",
734 "reserve": "20",
735 "shares": "30",
736 }
737 resource_allocation.return_value = {
738 "limit": 10,
739 "reserve": 20,
740 "shares": 30,
741 }
742
743 result = Ns._process_guest_epa_quota_params(
744 guest_epa_quota=guest_epa_quota,
745 epa_vcpu_set=epa_vcpu_set,
746 )
747
748 resource_allocation.assert_called_once_with(resource_allocation_param)
749 self.assertDictEqual(expected_result, result)
750
751 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
752 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
753 self,
754 resource_allocation,
755 ):
756 expected_result = {
757 "mem-quota": {
758 "limit": 10,
759 "reserve": 20,
760 "shares": 30,
761 },
762 }
763 guest_epa_quota = {
764 "mem-quota": {
765 "limit": "10",
766 "reserve": "20",
767 "shares": "30",
768 },
769 }
770 epa_vcpu_set = True
771
772 resource_allocation_param = {
773 "limit": "10",
774 "reserve": "20",
775 "shares": "30",
776 }
777 resource_allocation.return_value = {
778 "limit": 10,
779 "reserve": 20,
780 "shares": 30,
781 }
782
783 result = Ns._process_guest_epa_quota_params(
784 guest_epa_quota=guest_epa_quota,
785 epa_vcpu_set=epa_vcpu_set,
786 )
787
788 resource_allocation.assert_called_once_with(resource_allocation_param)
789 self.assertDictEqual(expected_result, result)
790
791 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
792 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
793 self,
794 resource_allocation,
795 ):
796 expected_result = {
797 "mem-quota": {
798 "limit": 10,
799 "reserve": 20,
800 "shares": 30,
801 },
802 }
803 guest_epa_quota = {
804 "mem-quota": {
805 "limit": "10",
806 "reserve": "20",
807 "shares": "30",
808 },
809 }
810 epa_vcpu_set = False
811
812 resource_allocation_param = {
813 "limit": "10",
814 "reserve": "20",
815 "shares": "30",
816 }
817 resource_allocation.return_value = {
818 "limit": 10,
819 "reserve": 20,
820 "shares": 30,
821 }
822
823 result = Ns._process_guest_epa_quota_params(
824 guest_epa_quota=guest_epa_quota,
825 epa_vcpu_set=epa_vcpu_set,
826 )
827
828 resource_allocation.assert_called_once_with(resource_allocation_param)
829 self.assertDictEqual(expected_result, result)
830
831 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
832 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
833 self,
834 resource_allocation,
835 ):
836 expected_result = {
837 "disk-io-quota": {
838 "limit": 10,
839 "reserve": 20,
840 "shares": 30,
841 },
842 }
843 guest_epa_quota = {
844 "disk-io-quota": {
845 "limit": "10",
846 "reserve": "20",
847 "shares": "30",
848 },
849 }
850 epa_vcpu_set = True
851
852 resource_allocation_param = {
853 "limit": "10",
854 "reserve": "20",
855 "shares": "30",
856 }
857 resource_allocation.return_value = {
858 "limit": 10,
859 "reserve": 20,
860 "shares": 30,
861 }
862
863 result = Ns._process_guest_epa_quota_params(
864 guest_epa_quota=guest_epa_quota,
865 epa_vcpu_set=epa_vcpu_set,
866 )
867
868 resource_allocation.assert_called_once_with(resource_allocation_param)
869 self.assertDictEqual(expected_result, result)
870
871 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
872 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
873 self,
874 resource_allocation,
875 ):
876 expected_result = {
877 "disk-io-quota": {
878 "limit": 10,
879 "reserve": 20,
880 "shares": 30,
881 },
882 }
883 guest_epa_quota = {
884 "disk-io-quota": {
885 "limit": "10",
886 "reserve": "20",
887 "shares": "30",
888 },
889 }
890 epa_vcpu_set = False
891
892 resource_allocation_param = {
893 "limit": "10",
894 "reserve": "20",
895 "shares": "30",
896 }
897 resource_allocation.return_value = {
898 "limit": 10,
899 "reserve": 20,
900 "shares": 30,
901 }
902
903 result = Ns._process_guest_epa_quota_params(
904 guest_epa_quota=guest_epa_quota,
905 epa_vcpu_set=epa_vcpu_set,
906 )
907
908 resource_allocation.assert_called_once_with(resource_allocation_param)
909 self.assertDictEqual(expected_result, result)
910
911 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
912 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
913 self,
914 resource_allocation,
915 ):
916 expected_result = {
917 "vif-quota": {
918 "limit": 10,
919 "reserve": 20,
920 "shares": 30,
921 },
922 }
923 guest_epa_quota = {
924 "vif-quota": {
925 "limit": "10",
926 "reserve": "20",
927 "shares": "30",
928 },
929 }
930 epa_vcpu_set = True
931
932 resource_allocation_param = {
933 "limit": "10",
934 "reserve": "20",
935 "shares": "30",
936 }
937 resource_allocation.return_value = {
938 "limit": 10,
939 "reserve": 20,
940 "shares": 30,
941 }
942
943 result = Ns._process_guest_epa_quota_params(
944 guest_epa_quota=guest_epa_quota,
945 epa_vcpu_set=epa_vcpu_set,
946 )
947
948 resource_allocation.assert_called_once_with(resource_allocation_param)
949 self.assertDictEqual(expected_result, result)
950
951 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
952 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
953 self,
954 resource_allocation,
955 ):
956 expected_result = {
957 "vif-quota": {
958 "limit": 10,
959 "reserve": 20,
960 "shares": 30,
961 },
962 }
963 guest_epa_quota = {
964 "vif-quota": {
965 "limit": "10",
966 "reserve": "20",
967 "shares": "30",
968 },
969 }
970 epa_vcpu_set = False
971
972 resource_allocation_param = {
973 "limit": "10",
974 "reserve": "20",
975 "shares": "30",
976 }
977 resource_allocation.return_value = {
978 "limit": 10,
979 "reserve": 20,
980 "shares": 30,
981 }
982
983 result = Ns._process_guest_epa_quota_params(
984 guest_epa_quota=guest_epa_quota,
985 epa_vcpu_set=epa_vcpu_set,
986 )
987
988 resource_allocation.assert_called_once_with(resource_allocation_param)
989 self.assertDictEqual(expected_result, result)
990
991 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
992 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
993 self,
994 resource_allocation,
995 ):
996 expected_result = {
997 "mem-quota": {
998 "limit": 10,
999 "reserve": 20,
1000 "shares": 30,
1001 },
1002 "disk-io-quota": {
1003 "limit": 10,
1004 "reserve": 20,
1005 "shares": 30,
1006 },
1007 "vif-quota": {
1008 "limit": 10,
1009 "reserve": 20,
1010 "shares": 30,
1011 },
1012 }
1013 guest_epa_quota = {
1014 "cpu-quota": {
1015 "limit": "10",
1016 "reserve": "20",
1017 "shares": "30",
1018 },
1019 "mem-quota": {
1020 "limit": "10",
1021 "reserve": "20",
1022 "shares": "30",
1023 },
1024 "disk-io-quota": {
1025 "limit": "10",
1026 "reserve": "20",
1027 "shares": "30",
1028 },
1029 "vif-quota": {
1030 "limit": "10",
1031 "reserve": "20",
1032 "shares": "30",
1033 },
1034 }
1035 epa_vcpu_set = True
1036
1037 resource_allocation.return_value = {
1038 "limit": 10,
1039 "reserve": 20,
1040 "shares": 30,
1041 }
1042
1043 result = Ns._process_guest_epa_quota_params(
1044 guest_epa_quota=guest_epa_quota,
1045 epa_vcpu_set=epa_vcpu_set,
1046 )
1047
1048 self.assertTrue(resource_allocation.called)
1049 self.assertDictEqual(expected_result, result)
1050
1051 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1052 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1053 self,
1054 resource_allocation,
1055 ):
1056 expected_result = {
1057 "cpu-quota": {
1058 "limit": 10,
1059 "reserve": 20,
1060 "shares": 30,
1061 },
1062 "mem-quota": {
1063 "limit": 10,
1064 "reserve": 20,
1065 "shares": 30,
1066 },
1067 "disk-io-quota": {
1068 "limit": 10,
1069 "reserve": 20,
1070 "shares": 30,
1071 },
1072 "vif-quota": {
1073 "limit": 10,
1074 "reserve": 20,
1075 "shares": 30,
1076 },
1077 }
1078 guest_epa_quota = {
1079 "cpu-quota": {
1080 "limit": "10",
1081 "reserve": "20",
1082 "shares": "30",
1083 },
1084 "mem-quota": {
1085 "limit": "10",
1086 "reserve": "20",
1087 "shares": "30",
1088 },
1089 "disk-io-quota": {
1090 "limit": "10",
1091 "reserve": "20",
1092 "shares": "30",
1093 },
1094 "vif-quota": {
1095 "limit": "10",
1096 "reserve": "20",
1097 "shares": "30",
1098 },
1099 }
1100 epa_vcpu_set = False
1101
1102 resource_allocation.return_value = {
1103 "limit": 10,
1104 "reserve": 20,
1105 "shares": 30,
1106 }
1107
1108 result = Ns._process_guest_epa_quota_params(
1109 guest_epa_quota=guest_epa_quota,
1110 epa_vcpu_set=epa_vcpu_set,
1111 )
1112
1113 self.assertTrue(resource_allocation.called)
1114 self.assertDictEqual(expected_result, result)
1115
1116 def test__process_guest_epa_numa_params_with_empty_numa_params(self):
1117 expected_numa_result = []
1118 expected_epa_vcpu_set_result = False
1119 guest_epa_quota = {}
1120
1121 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1122 guest_epa_quota=guest_epa_quota,
1123 )
1124 self.assertEqual(expected_numa_result, numa_result)
1125 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1126
1127 def test__process_guest_epa_numa_params_with_wrong_numa_params(self):
1128 expected_numa_result = []
1129 expected_epa_vcpu_set_result = False
1130 guest_epa_quota = {"no_nume": "here"}
1131
1132 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1133 guest_epa_quota=guest_epa_quota,
1134 )
1135
1136 self.assertEqual(expected_numa_result, numa_result)
1137 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1138
1139 def test__process_guest_epa_numa_params_with_numa_node_policy(self):
1140 expected_numa_result = []
1141 expected_epa_vcpu_set_result = False
1142 guest_epa_quota = {"numa-node-policy": {}}
1143
1144 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1145 guest_epa_quota=guest_epa_quota,
1146 )
1147
1148 self.assertEqual(expected_numa_result, numa_result)
1149 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1150
1151 def test__process_guest_epa_numa_params_with_no_node(self):
1152 expected_numa_result = []
1153 expected_epa_vcpu_set_result = False
1154 guest_epa_quota = {
1155 "numa-node-policy": {
1156 "node": [],
1157 },
1158 }
1159
1160 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1161 guest_epa_quota=guest_epa_quota,
1162 )
1163
1164 self.assertEqual(expected_numa_result, numa_result)
1165 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1166
1167 def test__process_guest_epa_numa_params_with_1_node_num_cores(self):
1168 expected_numa_result = [{"cores": 3}]
1169 expected_epa_vcpu_set_result = True
1170 guest_epa_quota = {
1171 "numa-node-policy": {
1172 "node": [
1173 {
1174 "num-cores": 3,
1175 },
1176 ],
1177 },
1178 }
1179
1180 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1181 guest_epa_quota=guest_epa_quota,
1182 )
1183
1184 self.assertEqual(expected_numa_result, numa_result)
1185 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1186
1187 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self):
1188 expected_numa_result = [{"paired_threads": 3}]
1189 expected_epa_vcpu_set_result = True
1190 guest_epa_quota = {
1191 "numa-node-policy": {
1192 "node": [
1193 {
1194 "paired-threads": {"num-paired-threads": "3"},
1195 },
1196 ],
1197 },
1198 }
1199
1200 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1201 guest_epa_quota=guest_epa_quota,
1202 )
1203
1204 self.assertEqual(expected_numa_result, numa_result)
1205 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1206
1207 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self):
1208 expected_numa_result = [
1209 {
1210 "paired-threads-id": [("0", "1"), ("4", "5")],
1211 }
1212 ]
1213 expected_epa_vcpu_set_result = False
1214 guest_epa_quota = {
1215 "numa-node-policy": {
1216 "node": [
1217 {
1218 "paired-threads": {
1219 "paired-thread-ids": [
1220 {
1221 "thread-a": 0,
1222 "thread-b": 1,
1223 },
1224 {
1225 "thread-a": 4,
1226 "thread-b": 5,
1227 },
1228 ],
1229 },
1230 },
1231 ],
1232 },
1233 }
1234
1235 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1236 guest_epa_quota=guest_epa_quota,
1237 )
1238
1239 self.assertEqual(expected_numa_result, numa_result)
1240 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1241
1242 def test__process_guest_epa_numa_params_with_1_node_num_threads(self):
1243 expected_numa_result = [{"threads": 3}]
1244 expected_epa_vcpu_set_result = True
1245 guest_epa_quota = {
1246 "numa-node-policy": {
1247 "node": [
1248 {
1249 "num-threads": "3",
1250 },
1251 ],
1252 },
1253 }
1254
1255 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1256 guest_epa_quota=guest_epa_quota,
1257 )
1258
1259 self.assertEqual(expected_numa_result, numa_result)
1260 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1261
1262 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self):
1263 expected_numa_result = [{"memory": 2}]
1264 expected_epa_vcpu_set_result = False
1265 guest_epa_quota = {
1266 "numa-node-policy": {
1267 "node": [
1268 {
1269 "memory-mb": 2048,
1270 },
1271 ],
1272 },
1273 }
1274
1275 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1276 guest_epa_quota=guest_epa_quota,
1277 )
1278
1279 self.assertEqual(expected_numa_result, numa_result)
1280 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1281
1282 def test__process_guest_epa_numa_params_with_1_node_vcpu(self):
1283 expected_numa_result = [
1284 {
1285 "id": 0,
1286 "vcpu": [0, 1],
1287 }
1288 ]
1289 expected_epa_vcpu_set_result = False
1290 guest_epa_quota = {
1291 "numa-node-policy": {
1292 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1293 },
1294 }
1295
1296 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1297 guest_epa_quota=guest_epa_quota,
1298 )
1299
1300 self.assertEqual(expected_numa_result, numa_result)
1301 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1302
1303 def test__process_guest_epa_numa_params_with_2_node_vcpu(self):
1304 expected_numa_result = [
1305 {
1306 "id": 0,
1307 "vcpu": [0, 1],
1308 },
1309 {
1310 "id": 1,
1311 "vcpu": [2, 3],
1312 },
1313 ]
1314
1315 expected_epa_vcpu_set_result = False
1316 guest_epa_quota = {
1317 "numa-node-policy": {
1318 "node": [
1319 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1320 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1321 ],
1322 },
1323 }
1324
1325 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1326 guest_epa_quota=guest_epa_quota,
1327 )
1328
1329 self.assertEqual(expected_numa_result, numa_result)
1330 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1331
1332 def test__process_guest_epa_numa_params_with_1_node(self):
1333 expected_numa_result = [
1334 {
1335 # "id": 0,
1336 # "vcpu": [0, 1],
1337 "cores": 3,
1338 "paired_threads": 3,
1339 "paired-threads-id": [("0", "1"), ("4", "5")],
1340 "threads": 3,
1341 "memory": 2,
1342 }
1343 ]
1344 expected_epa_vcpu_set_result = True
1345 guest_epa_quota = {
1346 "numa-node-policy": {
1347 "node": [
1348 {
1349 "num-cores": 3,
1350 "paired-threads": {
1351 "num-paired-threads": "3",
1352 "paired-thread-ids": [
1353 {
1354 "thread-a": 0,
1355 "thread-b": 1,
1356 },
1357 {
1358 "thread-a": 4,
1359 "thread-b": 5,
1360 },
1361 ],
1362 },
1363 "num-threads": "3",
1364 "memory-mb": 2048,
1365 },
1366 ],
1367 },
1368 }
1369
1370 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1371 guest_epa_quota=guest_epa_quota,
1372 )
1373
1374 self.assertEqual(expected_numa_result, numa_result)
1375 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1376
1377 def test__process_guest_epa_numa_params_with_2_nodes(self):
1378 expected_numa_result = [
1379 {
1380 "cores": 3,
1381 "paired_threads": 3,
1382 "paired-threads-id": [("0", "1"), ("4", "5")],
1383 "threads": 3,
1384 "memory": 2,
1385 },
1386 {
1387 "cores": 7,
1388 "paired_threads": 7,
1389 "paired-threads-id": [("2", "3"), ("5", "6")],
1390 "threads": 4,
1391 "memory": 4,
1392 },
1393 ]
1394 expected_epa_vcpu_set_result = True
1395 guest_epa_quota = {
1396 "numa-node-policy": {
1397 "node": [
1398 {
1399 "num-cores": 3,
1400 "paired-threads": {
1401 "num-paired-threads": "3",
1402 "paired-thread-ids": [
1403 {
1404 "thread-a": 0,
1405 "thread-b": 1,
1406 },
1407 {
1408 "thread-a": 4,
1409 "thread-b": 5,
1410 },
1411 ],
1412 },
1413 "num-threads": "3",
1414 "memory-mb": 2048,
1415 },
1416 {
1417 "num-cores": 7,
1418 "paired-threads": {
1419 "num-paired-threads": "7",
1420 "paired-thread-ids": [
1421 {
1422 "thread-a": 2,
1423 "thread-b": 3,
1424 },
1425 {
1426 "thread-a": 5,
1427 "thread-b": 6,
1428 },
1429 ],
1430 },
1431 "num-threads": "4",
1432 "memory-mb": 4096,
1433 },
1434 ],
1435 },
1436 }
1437
1438 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1439 guest_epa_quota=guest_epa_quota,
1440 )
1441
1442 self.assertEqual(expected_numa_result, numa_result)
1443 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1444
1445 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self):
1446 expected_numa_result = {}
1447 expected_epa_vcpu_set_result = False
1448 guest_epa_quota = {}
1449 vcpu_count = 0
1450 epa_vcpu_set = False
1451
1452 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1453 guest_epa_quota=guest_epa_quota,
1454 vcpu_count=vcpu_count,
1455 epa_vcpu_set=epa_vcpu_set,
1456 )
1457
1458 self.assertDictEqual(expected_numa_result, numa_result)
1459 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1460
1461 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self):
1462 expected_numa_result = {}
1463 expected_epa_vcpu_set_result = False
1464 guest_epa_quota = {
1465 "no-cpu-pinning-policy": "here",
1466 }
1467 vcpu_count = 0
1468 epa_vcpu_set = False
1469
1470 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1471 guest_epa_quota=guest_epa_quota,
1472 vcpu_count=vcpu_count,
1473 epa_vcpu_set=epa_vcpu_set,
1474 )
1475
1476 self.assertDictEqual(expected_numa_result, numa_result)
1477 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1478
1479 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self):
1480 expected_numa_result = {}
1481 expected_epa_vcpu_set_result = True
1482 guest_epa_quota = {
1483 "cpu-pinning-policy": "DEDICATED",
1484 }
1485 vcpu_count = 0
1486 epa_vcpu_set = True
1487
1488 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1489 guest_epa_quota=guest_epa_quota,
1490 vcpu_count=vcpu_count,
1491 epa_vcpu_set=epa_vcpu_set,
1492 )
1493
1494 self.assertDictEqual(expected_numa_result, numa_result)
1495 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1496
1497 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self):
1498 expected_numa_result = {"threads": 3}
1499 expected_epa_vcpu_set_result = True
1500 guest_epa_quota = {
1501 "cpu-pinning-policy": "DEDICATED",
1502 "cpu-thread-pinning-policy": "PREFER",
1503 }
1504 vcpu_count = 3
1505 epa_vcpu_set = False
1506
1507 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1508 guest_epa_quota=guest_epa_quota,
1509 vcpu_count=vcpu_count,
1510 epa_vcpu_set=epa_vcpu_set,
1511 )
1512
1513 self.assertDictEqual(expected_numa_result, numa_result)
1514 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1515
1516 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self):
1517 expected_numa_result = {"cores": 3}
1518 expected_epa_vcpu_set_result = True
1519 guest_epa_quota = {
1520 "cpu-pinning-policy": "DEDICATED",
1521 "cpu-thread-pinning-policy": "ISOLATE",
1522 }
1523 vcpu_count = 3
1524 epa_vcpu_set = False
1525
1526 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1527 guest_epa_quota=guest_epa_quota,
1528 vcpu_count=vcpu_count,
1529 epa_vcpu_set=epa_vcpu_set,
1530 )
1531
1532 self.assertDictEqual(expected_numa_result, numa_result)
1533 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1534
1535 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self):
1536 expected_numa_result = {"threads": 3}
1537 expected_epa_vcpu_set_result = True
1538 guest_epa_quota = {
1539 "cpu-pinning-policy": "DEDICATED",
1540 "cpu-thread-pinning-policy": "REQUIRE",
1541 }
1542 vcpu_count = 3
1543 epa_vcpu_set = False
1544
1545 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1546 guest_epa_quota=guest_epa_quota,
1547 vcpu_count=vcpu_count,
1548 epa_vcpu_set=epa_vcpu_set,
1549 )
1550
1551 self.assertDictEqual(expected_numa_result, numa_result)
1552 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1553
1554 def test__process_guest_epa_cpu_pinning_params(self):
1555 expected_numa_result = {"threads": 3}
1556 expected_epa_vcpu_set_result = True
1557 guest_epa_quota = {
1558 "cpu-pinning-policy": "DEDICATED",
1559 }
1560 vcpu_count = 3
1561 epa_vcpu_set = False
1562
1563 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1564 guest_epa_quota=guest_epa_quota,
1565 vcpu_count=vcpu_count,
1566 epa_vcpu_set=epa_vcpu_set,
1567 )
1568
1569 self.assertDictEqual(expected_numa_result, numa_result)
1570 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1571
1572 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1573 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1574 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1575 def test__process_guest_epa_params_with_empty_params(
1576 self,
1577 guest_epa_numa_params,
1578 guest_epa_cpu_pinning_params,
1579 guest_epa_quota_params,
1580 ):
1581 expected_result = {}
1582 target_flavor = {}
1583
1584 result = Ns._process_epa_params(
1585 target_flavor=target_flavor,
1586 )
1587
1588 self.assertDictEqual(expected_result, result)
1589 self.assertFalse(guest_epa_numa_params.called)
1590 self.assertFalse(guest_epa_cpu_pinning_params.called)
1591 self.assertFalse(guest_epa_quota_params.called)
1592
1593 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1594 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1595 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1596 def test__process_guest_epa_params_with_wrong_params(
1597 self,
1598 guest_epa_numa_params,
1599 guest_epa_cpu_pinning_params,
1600 guest_epa_quota_params,
1601 ):
1602 expected_result = {}
1603 target_flavor = {
1604 "no-guest-epa": "here",
1605 }
1606
1607 result = Ns._process_epa_params(
1608 target_flavor=target_flavor,
1609 )
1610
1611 self.assertDictEqual(expected_result, result)
1612 self.assertFalse(guest_epa_numa_params.called)
1613 self.assertFalse(guest_epa_cpu_pinning_params.called)
1614 self.assertFalse(guest_epa_quota_params.called)
1615
1616 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1617 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1618 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1619 def test__process_guest_epa_params(
1620 self,
1621 guest_epa_numa_params,
1622 guest_epa_cpu_pinning_params,
1623 guest_epa_quota_params,
1624 ):
1625 expected_result = {
1626 "mem-policy": "STRICT",
1627 }
1628 target_flavor = {
1629 "guest-epa": {
1630 "vcpu-count": 1,
1631 "numa-node-policy": {
1632 "mem-policy": "STRICT",
1633 },
1634 },
1635 }
1636
1637 guest_epa_numa_params.return_value = ({}, False)
1638 guest_epa_cpu_pinning_params.return_value = ({}, False)
1639 guest_epa_quota_params.return_value = {}
1640
1641 result = Ns._process_epa_params(
1642 target_flavor=target_flavor,
1643 )
1644
1645 self.assertDictEqual(expected_result, result)
1646 self.assertTrue(guest_epa_numa_params.called)
1647 self.assertTrue(guest_epa_cpu_pinning_params.called)
1648 self.assertTrue(guest_epa_quota_params.called)
1649
1650 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1651 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1652 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1653 def test__process_guest_epa_params_with_mempage_size(
1654 self,
1655 guest_epa_numa_params,
1656 guest_epa_cpu_pinning_params,
1657 guest_epa_quota_params,
1658 ):
1659 expected_result = {
1660 "mempage-size": "1G",
1661 "mem-policy": "STRICT",
1662 }
1663 target_flavor = {
1664 "guest-epa": {
1665 "vcpu-count": 1,
1666 "mempage-size": "1G",
1667 "numa-node-policy": {
1668 "mem-policy": "STRICT",
1669 },
1670 },
1671 }
1672
1673 guest_epa_numa_params.return_value = ({}, False)
1674 guest_epa_cpu_pinning_params.return_value = ({}, False)
1675 guest_epa_quota_params.return_value = {}
1676
1677 result = Ns._process_epa_params(
1678 target_flavor=target_flavor,
1679 )
1680
1681 self.assertDictEqual(expected_result, result)
1682 self.assertTrue(guest_epa_numa_params.called)
1683 self.assertTrue(guest_epa_cpu_pinning_params.called)
1684 self.assertTrue(guest_epa_quota_params.called)
1685
1686 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1687 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1688 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1689 def test__process_guest_epa_params_with_numa(
1690 self,
1691 guest_epa_numa_params,
1692 guest_epa_cpu_pinning_params,
1693 guest_epa_quota_params,
1694 ):
1695 expected_result = {
1696 "mempage-size": "1G",
1697 "cpu-pinning-policy": "DEDICATED",
1698 "cpu-thread-pinning-policy": "PREFER",
1699 "numas": [
1700 {
1701 "cores": 3,
1702 "memory": 2,
1703 "paired-threads": 3,
1704 "paired-threads-id": [("0", "1"), ("4", "5")],
1705 "threads": 3,
1706 }
1707 ],
1708 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1709 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1710 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1711 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1712 }
1713 target_flavor = {
1714 "guest-epa": {
1715 "vcpu-count": 1,
1716 "mempage-size": "1G",
1717 "cpu-pinning-policy": "DEDICATED",
1718 "cpu-thread-pinning-policy": "PREFER",
1719 "numa-node-policy": {
1720 "node": [
1721 {
1722 "num-cores": 3,
1723 "paired-threads": {
1724 "num-paired-threads": "3",
1725 "paired-thread-ids": [
1726 {
1727 "thread-a": 0,
1728 "thread-b": 1,
1729 },
1730 {
1731 "thread-a": 4,
1732 "thread-b": 5,
1733 },
1734 ],
1735 },
1736 "num-threads": "3",
1737 "memory-mb": 2048,
1738 },
1739 ],
1740 },
1741 "cpu-quota": {
1742 "limit": "10",
1743 "reserve": "20",
1744 "shares": "30",
1745 },
1746 "mem-quota": {
1747 "limit": "10",
1748 "reserve": "20",
1749 "shares": "30",
1750 },
1751 "disk-io-quota": {
1752 "limit": "10",
1753 "reserve": "20",
1754 "shares": "30",
1755 },
1756 "vif-quota": {
1757 "limit": "10",
1758 "reserve": "20",
1759 "shares": "30",
1760 },
1761 },
1762 }
1763
1764 guest_epa_numa_params.return_value = (
1765 [
1766 {
1767 "cores": 3,
1768 "paired-threads": 3,
1769 "paired-threads-id": [("0", "1"), ("4", "5")],
1770 "threads": 3,
1771 "memory": 2,
1772 },
1773 ],
1774 True,
1775 )
1776 guest_epa_cpu_pinning_params.return_value = (
1777 {
1778 "threads": 3,
1779 },
1780 True,
1781 )
1782 guest_epa_quota_params.return_value = {
1783 "cpu-quota": {
1784 "limit": 10,
1785 "reserve": 20,
1786 "shares": 30,
1787 },
1788 "mem-quota": {
1789 "limit": 10,
1790 "reserve": 20,
1791 "shares": 30,
1792 },
1793 "disk-io-quota": {
1794 "limit": 10,
1795 "reserve": 20,
1796 "shares": 30,
1797 },
1798 "vif-quota": {
1799 "limit": 10,
1800 "reserve": 20,
1801 "shares": 30,
1802 },
1803 }
1804
1805 result = Ns._process_epa_params(
1806 target_flavor=target_flavor,
1807 )
1808 self.assertEqual(expected_result, result)
1809 self.assertTrue(guest_epa_numa_params.called)
1810 self.assertTrue(guest_epa_cpu_pinning_params.called)
1811 self.assertTrue(guest_epa_quota_params.called)
1812
1813 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1814 def test__process_flavor_params_with_empty_target_flavor(
1815 self,
1816 epa_params,
1817 ):
1818 target_flavor = {}
1819 indata = {
1820 "vnf": [
1821 {
1822 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1823 },
1824 ],
1825 }
1826 vim_info = {}
1827 target_record_id = ""
1828
1829 with self.assertRaises(KeyError):
1830 Ns._process_flavor_params(
1831 target_flavor=target_flavor,
1832 indata=indata,
1833 vim_info=vim_info,
1834 target_record_id=target_record_id,
1835 )
1836
1837 self.assertFalse(epa_params.called)
1838
1839 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1840 def test__process_flavor_params_with_wrong_target_flavor(
1841 self,
1842 epa_params,
1843 ):
1844 target_flavor = {
1845 "no-target-flavor": "here",
1846 }
1847 indata = {}
1848 vim_info = {}
1849 target_record_id = ""
1850
1851 with self.assertRaises(KeyError):
1852 Ns._process_flavor_params(
1853 target_flavor=target_flavor,
1854 indata=indata,
1855 vim_info=vim_info,
1856 target_record_id=target_record_id,
1857 )
1858
1859 self.assertFalse(epa_params.called)
1860
1861 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1862 def test__process_flavor_params_with_empty_indata(
1863 self,
1864 epa_params,
1865 ):
1866 expected_result = {
1867 "find_params": {
1868 "flavor_data": {
1869 "disk": 10,
1870 "ram": 1024,
1871 "vcpus": 2,
1872 },
1873 },
1874 "params": {
1875 "flavor_data": {
1876 "disk": 10,
1877 "name": "test",
1878 "ram": 1024,
1879 "vcpus": 2,
1880 },
1881 },
1882 }
1883 target_flavor = {
1884 "name": "test",
1885 "storage-gb": "10",
1886 "memory-mb": "1024",
1887 "vcpu-count": "2",
1888 }
1889 indata = {}
1890 vim_info = {}
1891 target_record_id = ""
1892
1893 epa_params.return_value = {}
1894
1895 result = Ns._process_flavor_params(
1896 target_flavor=target_flavor,
1897 indata=indata,
1898 vim_info=vim_info,
1899 target_record_id=target_record_id,
1900 )
1901
1902 self.assertTrue(epa_params.called)
1903 self.assertDictEqual(result, expected_result)
1904
1905 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1906 def test__process_flavor_params_with_wrong_indata(
1907 self,
1908 epa_params,
1909 ):
1910 expected_result = {
1911 "find_params": {
1912 "flavor_data": {
1913 "disk": 10,
1914 "ram": 1024,
1915 "vcpus": 2,
1916 },
1917 },
1918 "params": {
1919 "flavor_data": {
1920 "disk": 10,
1921 "name": "test",
1922 "ram": 1024,
1923 "vcpus": 2,
1924 },
1925 },
1926 }
1927 target_flavor = {
1928 "name": "test",
1929 "storage-gb": "10",
1930 "memory-mb": "1024",
1931 "vcpu-count": "2",
1932 }
1933 indata = {
1934 "no-vnf": "here",
1935 }
1936 vim_info = {}
1937 target_record_id = ""
1938
1939 epa_params.return_value = {}
1940
1941 result = Ns._process_flavor_params(
1942 target_flavor=target_flavor,
1943 indata=indata,
1944 vim_info=vim_info,
1945 target_record_id=target_record_id,
1946 )
1947
1948 self.assertTrue(epa_params.called)
1949 self.assertDictEqual(result, expected_result)
1950
1951 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1952 def test__process_flavor_params_with_ephemeral_disk(
1953 self,
1954 epa_params,
1955 ):
1956 kwargs = {
1957 "db": db,
1958 }
1959
1960 db.get_one.return_value = {
1961 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1962 "df": [
1963 {
1964 "id": "default-df",
1965 "vdu-profile": [
1966 {"id": "without_volumes-VM", "min-number-of-instances": 1}
1967 ],
1968 }
1969 ],
1970 "id": "without_volumes-vnf",
1971 "product-name": "without_volumes-vnf",
1972 "vdu": [
1973 {
1974 "id": "without_volumes-VM",
1975 "name": "without_volumes-VM",
1976 "sw-image-desc": "ubuntu20.04",
1977 "alternative-sw-image-desc": [
1978 "ubuntu20.04-aws",
1979 "ubuntu20.04-azure",
1980 ],
1981 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
1982 }
1983 ],
1984 "version": "1.0",
1985 "virtual-storage-desc": [
1986 {"id": "root-volume", "size-of-storage": "10"},
1987 {
1988 "id": "ephemeral-volume",
1989 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
1990 "size-of-storage": "1",
1991 },
1992 ],
1993 "_admin": {
1994 "storage": {
1995 "fs": "mongo",
1996 "path": "/app/storage/",
1997 },
1998 "type": "vnfd",
1999 },
2000 }
2001 expected_result = {
2002 "find_params": {
2003 "flavor_data": {
2004 "disk": 10,
2005 "ram": 1024,
2006 "vcpus": 2,
2007 "ephemeral": 10,
2008 },
2009 },
2010 "params": {
2011 "flavor_data": {
2012 "disk": 10,
2013 "name": "test",
2014 "ram": 1024,
2015 "vcpus": 2,
2016 "ephemeral": 10,
2017 },
2018 },
2019 }
2020 target_flavor = {
2021 "id": "test_id",
2022 "name": "test",
2023 "storage-gb": "10",
2024 "memory-mb": "1024",
2025 "vcpu-count": "2",
2026 }
2027 indata = {
2028 "vnf": [
2029 {
2030 "vdur": [
2031 {
2032 "ns-flavor-id": "test_id",
2033 "virtual-storages": [
2034 {
2035 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2036 "size-of-storage": "10",
2037 },
2038 ],
2039 },
2040 ],
2041 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2042 },
2043 ],
2044 }
2045 vim_info = {}
2046 target_record_id = ""
2047
2048 epa_params.return_value = {}
2049
2050 result = Ns._process_flavor_params(
2051 target_flavor=target_flavor,
2052 indata=indata,
2053 vim_info=vim_info,
2054 target_record_id=target_record_id,
2055 **kwargs,
2056 )
2057
2058 self.assertTrue(epa_params.called)
2059 self.assertDictEqual(result, expected_result)
2060
2061 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2062 def test__process_flavor_params_with_swap_disk(
2063 self,
2064 epa_params,
2065 ):
2066 expected_result = {
2067 "find_params": {
2068 "flavor_data": {
2069 "disk": 10,
2070 "ram": 1024,
2071 "vcpus": 2,
2072 "swap": 20,
2073 },
2074 },
2075 "params": {
2076 "flavor_data": {
2077 "disk": 10,
2078 "name": "test",
2079 "ram": 1024,
2080 "vcpus": 2,
2081 "swap": 20,
2082 },
2083 },
2084 }
2085 target_flavor = {
2086 "id": "test_id",
2087 "name": "test",
2088 "storage-gb": "10",
2089 "memory-mb": "1024",
2090 "vcpu-count": "2",
2091 }
2092 indata = {
2093 "vnf": [
2094 {
2095 "vdur": [
2096 {
2097 "ns-flavor-id": "test_id",
2098 "virtual-storages": [
2099 {
2100 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2101 "size-of-storage": "20",
2102 },
2103 ],
2104 },
2105 ],
2106 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2107 },
2108 ],
2109 }
2110 vim_info = {}
2111 target_record_id = ""
2112
2113 epa_params.return_value = {}
2114
2115 result = Ns._process_flavor_params(
2116 target_flavor=target_flavor,
2117 indata=indata,
2118 vim_info=vim_info,
2119 target_record_id=target_record_id,
2120 )
2121
2122 self.assertTrue(epa_params.called)
2123 self.assertDictEqual(result, expected_result)
2124
2125 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2126 def test__process_flavor_params_with_persistent_root_disk(
2127 self,
2128 epa_params,
2129 ):
2130 kwargs = {
2131 "db": db,
2132 }
2133
2134 db.get_one.return_value = {
2135 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2136 "df": [
2137 {
2138 "id": "default-df",
2139 "vdu-profile": [
2140 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2141 ],
2142 }
2143 ],
2144 "id": "several_volumes-vnf",
2145 "product-name": "several_volumes-vnf",
2146 "vdu": [
2147 {
2148 "id": "several_volumes-VM",
2149 "name": "several_volumes-VM",
2150 "sw-image-desc": "ubuntu20.04",
2151 "alternative-sw-image-desc": [
2152 "ubuntu20.04-aws",
2153 "ubuntu20.04-azure",
2154 ],
2155 "virtual-storage-desc": [
2156 "persistent-root-volume",
2157 ],
2158 }
2159 ],
2160 "version": "1.0",
2161 "virtual-storage-desc": [
2162 {
2163 "id": "persistent-root-volume",
2164 "type-of-storage": "persistent-storage:persistent-storage",
2165 "size-of-storage": "10",
2166 },
2167 ],
2168 "_admin": {
2169 "storage": {
2170 "fs": "mongo",
2171 "path": "/app/storage/",
2172 },
2173 "type": "vnfd",
2174 },
2175 }
2176 expected_result = {
2177 "find_params": {
2178 "flavor_data": {
2179 "disk": 0,
2180 "ram": 1024,
2181 "vcpus": 2,
2182 },
2183 },
2184 "params": {
2185 "flavor_data": {
2186 "disk": 0,
2187 "name": "test",
2188 "ram": 1024,
2189 "vcpus": 2,
2190 },
2191 },
2192 }
2193 target_flavor = {
2194 "id": "test_id",
2195 "name": "test",
2196 "storage-gb": "10",
2197 "memory-mb": "1024",
2198 "vcpu-count": "2",
2199 }
2200 indata = {
2201 "vnf": [
2202 {
2203 "vdur": [
2204 {
2205 "vdu-name": "several_volumes-VM",
2206 "ns-flavor-id": "test_id",
2207 "virtual-storages": [
2208 {
2209 "type-of-storage": "persistent-storage:persistent-storage",
2210 "size-of-storage": "10",
2211 },
2212 ],
2213 },
2214 ],
2215 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2216 },
2217 ],
2218 }
2219 vim_info = {}
2220 target_record_id = ""
2221
2222 epa_params.return_value = {}
2223
2224 result = Ns._process_flavor_params(
2225 target_flavor=target_flavor,
2226 indata=indata,
2227 vim_info=vim_info,
2228 target_record_id=target_record_id,
2229 **kwargs,
2230 )
2231
2232 self.assertTrue(epa_params.called)
2233 self.assertDictEqual(result, expected_result)
2234
2235 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2236 def test__process_flavor_params_with_epa_params(
2237 self,
2238 epa_params,
2239 ):
2240 expected_result = {
2241 "find_params": {
2242 "flavor_data": {
2243 "disk": 10,
2244 "ram": 1024,
2245 "vcpus": 2,
2246 "extended": {
2247 "numa": "there-is-numa-here",
2248 },
2249 },
2250 },
2251 "params": {
2252 "flavor_data": {
2253 "disk": 10,
2254 "name": "test",
2255 "ram": 1024,
2256 "vcpus": 2,
2257 "extended": {
2258 "numa": "there-is-numa-here",
2259 },
2260 },
2261 },
2262 }
2263 target_flavor = {
2264 "id": "test_id",
2265 "name": "test",
2266 "storage-gb": "10",
2267 "memory-mb": "1024",
2268 "vcpu-count": "2",
2269 }
2270 indata = {
2271 "vnf": [
2272 {
2273 "vdur": [
2274 {
2275 "ns-flavor-id": "test_id",
2276 },
2277 ],
2278 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2279 },
2280 ],
2281 }
2282 vim_info = {}
2283 target_record_id = ""
2284
2285 epa_params.return_value = {
2286 "numa": "there-is-numa-here",
2287 }
2288
2289 result = Ns._process_flavor_params(
2290 target_flavor=target_flavor,
2291 indata=indata,
2292 vim_info=vim_info,
2293 target_record_id=target_record_id,
2294 )
2295
2296 self.assertTrue(epa_params.called)
2297 self.assertDictEqual(result, expected_result)
2298
2299 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2300 def test__process_flavor_params(
2301 self,
2302 epa_params,
2303 ):
2304 kwargs = {
2305 "db": db,
2306 }
2307
2308 db.get_one.return_value = {
2309 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2310 "df": [
2311 {
2312 "id": "default-df",
2313 "vdu-profile": [
2314 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2315 ],
2316 }
2317 ],
2318 "id": "without_volumes-vnf",
2319 "product-name": "without_volumes-vnf",
2320 "vdu": [
2321 {
2322 "id": "without_volumes-VM",
2323 "name": "without_volumes-VM",
2324 "sw-image-desc": "ubuntu20.04",
2325 "alternative-sw-image-desc": [
2326 "ubuntu20.04-aws",
2327 "ubuntu20.04-azure",
2328 ],
2329 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2330 }
2331 ],
2332 "version": "1.0",
2333 "virtual-storage-desc": [
2334 {"id": "root-volume", "size-of-storage": "10"},
2335 {
2336 "id": "ephemeral-volume",
2337 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2338 "size-of-storage": "1",
2339 },
2340 ],
2341 "_admin": {
2342 "storage": {
2343 "fs": "mongo",
2344 "path": "/app/storage/",
2345 },
2346 "type": "vnfd",
2347 },
2348 }
2349
2350 expected_result = {
2351 "find_params": {
2352 "flavor_data": {
2353 "disk": 10,
2354 "ram": 1024,
2355 "vcpus": 2,
2356 "ephemeral": 10,
2357 "swap": 20,
2358 "extended": {
2359 "numa": "there-is-numa-here",
2360 },
2361 },
2362 },
2363 "params": {
2364 "flavor_data": {
2365 "disk": 10,
2366 "name": "test",
2367 "ram": 1024,
2368 "vcpus": 2,
2369 "ephemeral": 10,
2370 "swap": 20,
2371 "extended": {
2372 "numa": "there-is-numa-here",
2373 },
2374 },
2375 },
2376 }
2377 target_flavor = {
2378 "id": "test_id",
2379 "name": "test",
2380 "storage-gb": "10",
2381 "memory-mb": "1024",
2382 "vcpu-count": "2",
2383 }
2384 indata = {
2385 "vnf": [
2386 {
2387 "vdur": [
2388 {
2389 "ns-flavor-id": "test_id",
2390 "virtual-storages": [
2391 {
2392 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2393 "size-of-storage": "10",
2394 },
2395 {
2396 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2397 "size-of-storage": "20",
2398 },
2399 ],
2400 },
2401 ],
2402 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2403 },
2404 ],
2405 }
2406 vim_info = {}
2407 target_record_id = ""
2408
2409 epa_params.return_value = {
2410 "numa": "there-is-numa-here",
2411 }
2412
2413 result = Ns._process_flavor_params(
2414 target_flavor=target_flavor,
2415 indata=indata,
2416 vim_info=vim_info,
2417 target_record_id=target_record_id,
2418 **kwargs,
2419 )
2420
2421 self.assertTrue(epa_params.called)
2422 self.assertDictEqual(result, expected_result)
2423
2424 def test__process_net_params_with_empty_params(
2425 self,
2426 ):
2427 target_vld = {
2428 "name": "vld-name",
2429 }
2430 indata = {
2431 "name": "ns-name",
2432 }
2433 vim_info = {
2434 "provider_network": "some-profile-here",
2435 "ip_profile": {
2436 "some_ip_profile": "here",
2437 },
2438 }
2439 target_record_id = ""
2440 expected_result = {
2441 "params": {
2442 "net_name": "ns-name-vld-name",
2443 "net_type": "bridge",
2444 "ip_profile": {
2445 "some_ip_profile": "here",
2446 },
2447 "provider_network_profile": "some-profile-here",
2448 }
2449 }
2450
2451 result = Ns._process_net_params(
2452 target_vld=target_vld,
2453 indata=indata,
2454 vim_info=vim_info,
2455 target_record_id=target_record_id,
2456 )
2457
2458 self.assertDictEqual(expected_result, result)
2459
2460 def test__process_net_params_with_vim_info_sdn(
2461 self,
2462 ):
2463 target_vld = {
2464 "name": "vld-name",
2465 }
2466 indata = {
2467 "name": "ns-name",
2468 }
2469 vim_info = {
2470 "sdn": "some-sdn",
2471 "sdn-ports": ["some", "ports", "here"],
2472 "vlds": ["some", "vlds", "here"],
2473 "type": "sdn-type",
2474 }
2475 target_record_id = "vld.sdn.something"
2476 expected_result = {
2477 "params": {
2478 "sdn-ports": ["some", "ports", "here"],
2479 "vlds": ["some", "vlds", "here"],
2480 "type": "sdn-type",
2481 }
2482 }
2483
2484 result = Ns._process_net_params(
2485 target_vld=target_vld,
2486 indata=indata,
2487 vim_info=vim_info,
2488 target_record_id=target_record_id,
2489 )
2490
2491 self.assertDictEqual(expected_result, result)
2492
2493 def test__process_net_params_with_vim_info_sdn_target_vim(
2494 self,
2495 ):
2496 target_vld = {
2497 "name": "vld-name",
2498 }
2499 indata = {
2500 "name": "ns-name",
2501 }
2502 vim_info = {
2503 "sdn": "some-sdn",
2504 "sdn-ports": ["some", "ports", "here"],
2505 "vlds": ["some", "vlds", "here"],
2506 "target_vim": "some-vim",
2507 "type": "sdn-type",
2508 }
2509 target_record_id = "vld.sdn.something"
2510 expected_result = {
2511 "depends_on": ["some-vim vld.sdn"],
2512 "params": {
2513 "sdn-ports": ["some", "ports", "here"],
2514 "vlds": ["some", "vlds", "here"],
2515 "target_vim": "some-vim",
2516 "type": "sdn-type",
2517 },
2518 }
2519
2520 result = Ns._process_net_params(
2521 target_vld=target_vld,
2522 indata=indata,
2523 vim_info=vim_info,
2524 target_record_id=target_record_id,
2525 )
2526
2527 self.assertDictEqual(expected_result, result)
2528
2529 def test__process_net_params_with_vim_network_name(
2530 self,
2531 ):
2532 target_vld = {
2533 "name": "vld-name",
2534 }
2535 indata = {
2536 "name": "ns-name",
2537 }
2538 vim_info = {
2539 "vim_network_name": "some-network-name",
2540 }
2541 target_record_id = "vld.sdn.something"
2542 expected_result = {
2543 "find_params": {
2544 "filter_dict": {
2545 "name": "some-network-name",
2546 },
2547 },
2548 }
2549
2550 result = Ns._process_net_params(
2551 target_vld=target_vld,
2552 indata=indata,
2553 vim_info=vim_info,
2554 target_record_id=target_record_id,
2555 )
2556
2557 self.assertDictEqual(expected_result, result)
2558
2559 def test__process_net_params_with_vim_network_id(
2560 self,
2561 ):
2562 target_vld = {
2563 "name": "vld-name",
2564 }
2565 indata = {
2566 "name": "ns-name",
2567 }
2568 vim_info = {
2569 "vim_network_id": "some-network-id",
2570 }
2571 target_record_id = "vld.sdn.something"
2572 expected_result = {
2573 "find_params": {
2574 "filter_dict": {
2575 "id": "some-network-id",
2576 },
2577 },
2578 }
2579
2580 result = Ns._process_net_params(
2581 target_vld=target_vld,
2582 indata=indata,
2583 vim_info=vim_info,
2584 target_record_id=target_record_id,
2585 )
2586
2587 self.assertDictEqual(expected_result, result)
2588
2589 def test__process_net_params_with_mgmt_network(
2590 self,
2591 ):
2592 target_vld = {
2593 "id": "vld-id",
2594 "name": "vld-name",
2595 "mgmt-network": "some-mgmt-network",
2596 }
2597 indata = {
2598 "name": "ns-name",
2599 }
2600 vim_info = {}
2601 target_record_id = "vld.sdn.something"
2602 expected_result = {
2603 "find_params": {
2604 "mgmt": True,
2605 "name": "vld-id",
2606 },
2607 }
2608
2609 result = Ns._process_net_params(
2610 target_vld=target_vld,
2611 indata=indata,
2612 vim_info=vim_info,
2613 target_record_id=target_record_id,
2614 )
2615
2616 self.assertDictEqual(expected_result, result)
2617
2618 def test__process_net_params_with_underlay_eline(
2619 self,
2620 ):
2621 target_vld = {
2622 "name": "vld-name",
2623 "underlay": "some-underlay-here",
2624 "type": "ELINE",
2625 }
2626 indata = {
2627 "name": "ns-name",
2628 }
2629 vim_info = {
2630 "provider_network": "some-profile-here",
2631 "ip_profile": {
2632 "some_ip_profile": "here",
2633 },
2634 }
2635 target_record_id = ""
2636 expected_result = {
2637 "params": {
2638 "ip_profile": {
2639 "some_ip_profile": "here",
2640 },
2641 "net_name": "ns-name-vld-name",
2642 "net_type": "ptp",
2643 "provider_network_profile": "some-profile-here",
2644 }
2645 }
2646
2647 result = Ns._process_net_params(
2648 target_vld=target_vld,
2649 indata=indata,
2650 vim_info=vim_info,
2651 target_record_id=target_record_id,
2652 )
2653
2654 self.assertDictEqual(expected_result, result)
2655
2656 def test__process_net_params_with_underlay_elan(
2657 self,
2658 ):
2659 target_vld = {
2660 "name": "vld-name",
2661 "underlay": "some-underlay-here",
2662 "type": "ELAN",
2663 }
2664 indata = {
2665 "name": "ns-name",
2666 }
2667 vim_info = {
2668 "provider_network": "some-profile-here",
2669 "ip_profile": {
2670 "some_ip_profile": "here",
2671 },
2672 }
2673 target_record_id = ""
2674 expected_result = {
2675 "params": {
2676 "ip_profile": {
2677 "some_ip_profile": "here",
2678 },
2679 "net_name": "ns-name-vld-name",
2680 "net_type": "data",
2681 "provider_network_profile": "some-profile-here",
2682 }
2683 }
2684
2685 result = Ns._process_net_params(
2686 target_vld=target_vld,
2687 indata=indata,
2688 vim_info=vim_info,
2689 target_record_id=target_record_id,
2690 )
2691
2692 self.assertDictEqual(expected_result, result)
2693
2694 def test__get_cloud_init_exception(self):
2695 db_mock = MagicMock(name="database mock")
2696 fs_mock = None
2697
2698 location = ""
2699
2700 with self.assertRaises(NsException):
2701 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2702
2703 def test__get_cloud_init_file_fs_exception(self):
2704 db_mock = MagicMock(name="database mock")
2705 fs_mock = None
2706
2707 location = "vnfr_id_123456:file:test_file"
2708 db_mock.get_one.return_value = {
2709 "_admin": {
2710 "storage": {
2711 "folder": "/home/osm",
2712 "pkg-dir": "vnfr_test_dir",
2713 },
2714 },
2715 }
2716
2717 with self.assertRaises(NsException):
2718 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2719
2720 def test__get_cloud_init_file(self):
2721 db_mock = MagicMock(name="database mock")
2722 fs_mock = MagicMock(name="filesystem mock")
2723 file_mock = MagicMock(name="file mock")
2724
2725 location = "vnfr_id_123456:file:test_file"
2726 cloud_init_content = "this is a cloud init file content"
2727
2728 db_mock.get_one.return_value = {
2729 "_admin": {
2730 "storage": {
2731 "folder": "/home/osm",
2732 "pkg-dir": "vnfr_test_dir",
2733 },
2734 },
2735 }
2736 fs_mock.file_open.return_value = file_mock
2737 file_mock.__enter__.return_value.read.return_value = cloud_init_content
2738
2739 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2740
2741 self.assertEqual(cloud_init_content, result)
2742
2743 def test__get_cloud_init_vdu(self):
2744 db_mock = MagicMock(name="database mock")
2745 fs_mock = None
2746
2747 location = "vnfr_id_123456:vdu:0"
2748 cloud_init_content = "this is a cloud init file content"
2749
2750 db_mock.get_one.return_value = {
2751 "vdu": {
2752 0: {
2753 "cloud-init": cloud_init_content,
2754 },
2755 },
2756 }
2757
2758 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2759
2760 self.assertEqual(cloud_init_content, result)
2761
2762 @patch("jinja2.Environment.__init__")
2763 def test__parse_jinja2_undefined_error(self, env_mock: Mock):
2764 cloud_init_content = None
2765 params = None
2766 context = None
2767
2768 env_mock.side_effect = UndefinedError("UndefinedError occurred.")
2769
2770 with self.assertRaises(NsException):
2771 Ns._parse_jinja2(
2772 cloud_init_content=cloud_init_content, params=params, context=context
2773 )
2774
2775 @patch("jinja2.Environment.__init__")
2776 def test__parse_jinja2_template_error(self, env_mock: Mock):
2777 cloud_init_content = None
2778 params = None
2779 context = None
2780
2781 env_mock.side_effect = TemplateError("TemplateError occurred.")
2782
2783 with self.assertRaises(NsException):
2784 Ns._parse_jinja2(
2785 cloud_init_content=cloud_init_content, params=params, context=context
2786 )
2787
2788 @patch("jinja2.Environment.__init__")
2789 def test__parse_jinja2_template_not_found(self, env_mock: Mock):
2790 cloud_init_content = None
2791 params = None
2792 context = None
2793
2794 env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.")
2795
2796 with self.assertRaises(NsException):
2797 Ns._parse_jinja2(
2798 cloud_init_content=cloud_init_content, params=params, context=context
2799 )
2800
2801 def test_rendering_jinja2_temp_without_special_characters(self):
2802 cloud_init_content = """
2803 disk_setup:
2804 ephemeral0:
2805 table_type: {{type}}
2806 layout: True
2807 overwrite: {{is_override}}
2808 runcmd:
2809 - [ ls, -l, / ]
2810 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2811 """
2812 params = {
2813 "type": "mbr",
2814 "is_override": "False",
2815 "command": "; mkdir abc",
2816 }
2817 context = "cloud-init for VM"
2818 expected_result = """
2819 disk_setup:
2820 ephemeral0:
2821 table_type: mbr
2822 layout: True
2823 overwrite: False
2824 runcmd:
2825 - [ ls, -l, / ]
2826 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
2827 """
2828 result = Ns._parse_jinja2(
2829 cloud_init_content=cloud_init_content, params=params, context=context
2830 )
2831 self.assertEqual(result, expected_result)
2832
2833 def test_rendering_jinja2_temp_with_special_characters(self):
2834 cloud_init_content = """
2835 disk_setup:
2836 ephemeral0:
2837 table_type: {{type}}
2838 layout: True
2839 overwrite: {{is_override}}
2840 runcmd:
2841 - [ ls, -l, / ]
2842 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2843 """
2844 params = {
2845 "type": "mbr",
2846 "is_override": "False",
2847 "command": "& rm -rf",
2848 }
2849 context = "cloud-init for VM"
2850 expected_result = """
2851 disk_setup:
2852 ephemeral0:
2853 table_type: mbr
2854 layout: True
2855 overwrite: False
2856 runcmd:
2857 - [ ls, -l, / ]
2858 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
2859 """
2860 result = Ns._parse_jinja2(
2861 cloud_init_content=cloud_init_content, params=params, context=context
2862 )
2863 self.assertNotEqual(result, expected_result)
2864
2865 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self):
2866 with patch("osm_ng_ro.ns.Environment") as mock_environment:
2867 mock_environment.return_value = Environment(
2868 undefined=StrictUndefined,
2869 autoescape=select_autoescape(default_for_string=False, default=False),
2870 )
2871 cloud_init_content = """
2872 disk_setup:
2873 ephemeral0:
2874 table_type: {{type}}
2875 layout: True
2876 overwrite: {{is_override}}
2877 runcmd:
2878 - [ ls, -l, / ]
2879 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2880 """
2881 params = {
2882 "type": "mbr",
2883 "is_override": "False",
2884 "command": "& rm -rf /",
2885 }
2886 context = "cloud-init for VM"
2887 expected_result = """
2888 disk_setup:
2889 ephemeral0:
2890 table_type: mbr
2891 layout: True
2892 overwrite: False
2893 runcmd:
2894 - [ ls, -l, / ]
2895 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
2896 """
2897 result = Ns._parse_jinja2(
2898 cloud_init_content=cloud_init_content,
2899 params=params,
2900 context=context,
2901 )
2902 self.assertEqual(result, expected_result)
2903
2904 @patch("osm_ng_ro.ns.Ns._assign_vim")
2905 def test__rebuild_start_stop_task(self, assign_vim):
2906 self.ns = Ns()
2907 extra_dict = {}
2908 actions = ["start", "stop", "rebuild"]
2909 vdu_id = "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
2910 vnf_id = "665b4165-ce24-4320-bf19-b9a45bade49f"
2911 vdu_index = "0"
2912 action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
2913 nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
2914 task_index = 0
2915 target_vim = "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
2916 t = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
2917 for action in actions:
2918 expected_result = {
2919 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
2920 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
2921 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
2922 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
2923 "status": "SCHEDULED",
2924 "action": "EXEC",
2925 "item": "update",
2926 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0",
2927 "target_record_id": t,
2928 "params": {
2929 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
2930 "action": action,
2931 },
2932 }
2933 extra_dict["params"] = {
2934 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
2935 "action": action,
2936 }
2937 task = self.ns.rebuild_start_stop_task(
2938 vdu_id,
2939 vnf_id,
2940 vdu_index,
2941 action_id,
2942 nsr_id,
2943 task_index,
2944 target_vim,
2945 extra_dict,
2946 )
2947 self.assertEqual(task.get("action_id"), action_id)
2948 self.assertEqual(task.get("nsr_id"), nsr_id)
2949 self.assertEqual(task.get("target_id"), target_vim)
2950 self.assertDictEqual(task, expected_result)
2951
2952 @patch("osm_ng_ro.ns.Ns._assign_vim")
2953 def test_verticalscale_task(self, assign_vim):
2954 self.ns = Ns()
2955 extra_dict = {}
2956 vdu_index = "1"
2957 action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
2958 nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
2959 task_index = 1
2960 target_record_id = (
2961 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
2962 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
2963 )
2964
2965 expected_result = {
2966 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
2967 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
2968 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
2969 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
2970 "status": "SCHEDULED",
2971 "action": "EXEC",
2972 "item": "verticalscale",
2973 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
2974 "target_record_id": target_record_id,
2975 "params": {
2976 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
2977 "flavor_dict": "flavor_dict",
2978 },
2979 }
2980 vdu = {
2981 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
2982 "vim_info": {
2983 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
2984 },
2985 }
2986 vnf = {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
2987 extra_dict["params"] = {
2988 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
2989 "flavor_dict": "flavor_dict",
2990 }
2991 task = self.ns.verticalscale_task(
2992 vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
2993 )
2994
2995 self.assertDictEqual(task, expected_result)
2996
2997 @patch("osm_ng_ro.ns.Ns._assign_vim")
2998 def test_migrate_task(self, assign_vim):
2999 self.ns = Ns()
3000 extra_dict = {}
3001 vdu_index = "1"
3002 action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
3003 nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
3004 task_index = 1
3005 target_record_id = (
3006 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3007 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3008 )
3009
3010 expected_result = {
3011 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3012 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3013 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3014 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3015 "status": "SCHEDULED",
3016 "action": "EXEC",
3017 "item": "migrate",
3018 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3019 "target_record_id": target_record_id,
3020 "params": {
3021 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3022 "migrate_host": "migrateToHost",
3023 },
3024 }
3025 vdu = {
3026 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3027 "vim_info": {
3028 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3029 },
3030 }
3031 vnf = {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3032 extra_dict["params"] = {
3033 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3034 "migrate_host": "migrateToHost",
3035 }
3036 task = self.ns.migrate_task(
3037 vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
3038 )
3039
3040 self.assertDictEqual(task, expected_result)
3041
3042
3043 class TestProcessVduParams(unittest.TestCase):
3044 def setUp(self):
3045 self.ns = Ns()
3046 self.logger = CopyingMock(autospec=True)
3047
3048 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3049 def test_find_persistent_root_volumes_empty_instantiation_vol_list(
3050 self, mock_volume_keeping_required
3051 ):
3052 """Find persistent root volume, instantiation_vol_list is empty."""
3053 vnfd = deepcopy(vnfd_wth_persistent_storage)
3054 target_vdu = target_vdu_wth_persistent_storage
3055 vdu_instantiation_volumes_list = []
3056 disk_list = []
3057 mock_volume_keeping_required.return_value = True
3058 expected_root_disk = {
3059 "id": "persistent-root-volume",
3060 "type-of-storage": "persistent-storage:persistent-storage",
3061 "size-of-storage": "10",
3062 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}],
3063 }
3064 expected_persist_root_disk = {
3065 "persistent-root-volume": {
3066 "image_id": "ubuntu20.04",
3067 "size": "10",
3068 "keep": True,
3069 }
3070 }
3071 expected_disk_list = [
3072 {
3073 "image_id": "ubuntu20.04",
3074 "size": "10",
3075 "keep": True,
3076 },
3077 ]
3078 persist_root_disk = self.ns.find_persistent_root_volumes(
3079 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3080 )
3081 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3082 mock_volume_keeping_required.assert_called_once_with(expected_root_disk)
3083 self.assertEqual(disk_list, expected_disk_list)
3084 self.assertEqual(len(disk_list), 1)
3085
3086 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3087 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(
3088 self, mock_volume_keeping_required
3089 ):
3090 """Find persistent root volume, always selects the first vsd as root volume."""
3091 vnfd = deepcopy(vnfd_wth_persistent_storage)
3092 vnfd["vdu"][0]["virtual-storage-desc"] = [
3093 "persistent-volume2",
3094 "persistent-root-volume",
3095 "ephemeral-volume",
3096 ]
3097 target_vdu = target_vdu_wth_persistent_storage
3098 vdu_instantiation_volumes_list = []
3099 disk_list = []
3100 mock_volume_keeping_required.return_value = True
3101 expected_root_disk = {
3102 "id": "persistent-volume2",
3103 "type-of-storage": "persistent-storage:persistent-storage",
3104 "size-of-storage": "10",
3105 }
3106 expected_persist_root_disk = {
3107 "persistent-volume2": {
3108 "image_id": "ubuntu20.04",
3109 "size": "10",
3110 "keep": True,
3111 }
3112 }
3113 expected_disk_list = [
3114 {
3115 "image_id": "ubuntu20.04",
3116 "size": "10",
3117 "keep": True,
3118 },
3119 ]
3120 persist_root_disk = self.ns.find_persistent_root_volumes(
3121 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3122 )
3123 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3124 mock_volume_keeping_required.assert_called_once_with(expected_root_disk)
3125 self.assertEqual(disk_list, expected_disk_list)
3126 self.assertEqual(len(disk_list), 1)
3127
3128 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3129 def test_find_persistent_root_volumes_empty_size_of_storage(
3130 self, mock_volume_keeping_required
3131 ):
3132 """Find persistent root volume, size of storage is empty."""
3133 vnfd = deepcopy(vnfd_wth_persistent_storage)
3134 vnfd["virtual-storage-desc"][0]["size-of-storage"] = ""
3135 vnfd["vdu"][0]["virtual-storage-desc"] = [
3136 "persistent-volume2",
3137 "persistent-root-volume",
3138 "ephemeral-volume",
3139 ]
3140 target_vdu = target_vdu_wth_persistent_storage
3141 vdu_instantiation_volumes_list = []
3142 disk_list = []
3143 persist_root_disk = self.ns.find_persistent_root_volumes(
3144 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3145 )
3146 self.assertEqual(persist_root_disk, None)
3147 mock_volume_keeping_required.assert_not_called()
3148 self.assertEqual(disk_list, [])
3149
3150 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3151 def test_find_persistent_root_volumes_keeping_is_not_required(
3152 self, mock_volume_keeping_required
3153 ):
3154 """Find persistent root volume, volume keeping is not required."""
3155 vnfd = deepcopy(vnfd_wth_persistent_storage)
3156 vnfd["virtual-storage-desc"][1]["vdu-storage-requirements"] = [
3157 {"key": "keep-volume", "value": "false"},
3158 ]
3159 target_vdu = target_vdu_wth_persistent_storage
3160 vdu_instantiation_volumes_list = []
3161 disk_list = []
3162 mock_volume_keeping_required.return_value = False
3163 expected_root_disk = {
3164 "id": "persistent-root-volume",
3165 "type-of-storage": "persistent-storage:persistent-storage",
3166 "size-of-storage": "10",
3167 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}],
3168 }
3169 expected_persist_root_disk = {
3170 "persistent-root-volume": {
3171 "image_id": "ubuntu20.04",
3172 "size": "10",
3173 "keep": False,
3174 }
3175 }
3176 expected_disk_list = [
3177 {
3178 "image_id": "ubuntu20.04",
3179 "size": "10",
3180 "keep": False,
3181 },
3182 ]
3183 persist_root_disk = self.ns.find_persistent_root_volumes(
3184 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3185 )
3186 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3187 mock_volume_keeping_required.assert_called_once_with(expected_root_disk)
3188 self.assertEqual(disk_list, expected_disk_list)
3189 self.assertEqual(len(disk_list), 1)
3190
3191 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3192 def test_find_persistent_root_volumes_target_vdu_mismatch(
3193 self, mock_volume_keeping_required
3194 ):
3195 """Find persistent root volume, target vdu name is not matching."""
3196 vnfd = deepcopy(vnfd_wth_persistent_storage)
3197 vnfd["vdu"][0]["name"] = "Several_Volumes-VM"
3198 target_vdu = target_vdu_wth_persistent_storage
3199 vdu_instantiation_volumes_list = []
3200 disk_list = []
3201 result = self.ns.find_persistent_root_volumes(
3202 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3203 )
3204 self.assertEqual(result, None)
3205 mock_volume_keeping_required.assert_not_called()
3206 self.assertEqual(disk_list, [])
3207 self.assertEqual(len(disk_list), 0)
3208
3209 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3210 def test_find_persistent_root_volumes_with_instantiation_vol_list(
3211 self, mock_volume_keeping_required
3212 ):
3213 """Find persistent root volume, existing volume needs to be used."""
3214 vnfd = deepcopy(vnfd_wth_persistent_storage)
3215 target_vdu = target_vdu_wth_persistent_storage
3216 vdu_instantiation_volumes_list = [
3217 {
3218 "vim-volume-id": vim_volume_id,
3219 "name": "persistent-root-volume",
3220 }
3221 ]
3222 disk_list = []
3223 expected_persist_root_disk = {
3224 "persistent-root-volume": {
3225 "vim_volume_id": vim_volume_id,
3226 "image_id": "ubuntu20.04",
3227 },
3228 }
3229 expected_disk_list = [
3230 {
3231 "vim_volume_id": vim_volume_id,
3232 "image_id": "ubuntu20.04",
3233 },
3234 ]
3235 persist_root_disk = self.ns.find_persistent_root_volumes(
3236 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3237 )
3238 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3239 mock_volume_keeping_required.assert_not_called()
3240 self.assertEqual(disk_list, expected_disk_list)
3241 self.assertEqual(len(disk_list), 1)
3242
3243 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3244 def test_find_persistent_root_volumes_invalid_instantiation_params(
3245 self, mock_volume_keeping_required
3246 ):
3247 """Find persistent root volume, existing volume id keyword is invalid."""
3248 vnfd = deepcopy(vnfd_wth_persistent_storage)
3249 target_vdu = target_vdu_wth_persistent_storage
3250 vdu_instantiation_volumes_list = [
3251 {
3252 "volume-id": vim_volume_id,
3253 "name": "persistent-root-volume",
3254 }
3255 ]
3256 disk_list = []
3257 with self.assertRaises(KeyError):
3258 self.ns.find_persistent_root_volumes(
3259 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3260 )
3261 mock_volume_keeping_required.assert_not_called()
3262 self.assertEqual(disk_list, [])
3263 self.assertEqual(len(disk_list), 0)
3264
3265 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3266 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3267 self, mock_volume_keeping_required
3268 ):
3269 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3270 persistent_root_disk = {
3271 "persistent-root-volume": {
3272 "image_id": "ubuntu20.04",
3273 "size": "10",
3274 "keep": False,
3275 }
3276 }
3277 mock_volume_keeping_required.return_value = False
3278 target_vdu = target_vdu_wth_persistent_storage
3279 vdu_instantiation_volumes_list = []
3280 disk_list = [
3281 {
3282 "image_id": "ubuntu20.04",
3283 "size": "10",
3284 "keep": False,
3285 },
3286 ]
3287 expected_disk = {
3288 "id": "persistent-volume2",
3289 "size-of-storage": "10",
3290 "type-of-storage": "persistent-storage:persistent-storage",
3291 }
3292 expected_disk_list = [
3293 {
3294 "image_id": "ubuntu20.04",
3295 "size": "10",
3296 "keep": False,
3297 },
3298 {
3299 "size": "10",
3300 "keep": False,
3301 },
3302 ]
3303 self.ns.find_persistent_volumes(
3304 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3305 )
3306 self.assertEqual(disk_list, expected_disk_list)
3307 mock_volume_keeping_required.assert_called_once_with(expected_disk)
3308
3309 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3310 def test_find_persistent_volumes_vdu_wth_inst_vol_list(
3311 self, mock_volume_keeping_required
3312 ):
3313 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3314 persistent_root_disk = {
3315 "persistent-root-volume": {
3316 "image_id": "ubuntu20.04",
3317 "size": "10",
3318 "keep": False,
3319 }
3320 }
3321 vdu_instantiation_volumes_list = [
3322 {
3323 "vim-volume-id": vim_volume_id,
3324 "name": "persistent-volume2",
3325 }
3326 ]
3327 target_vdu = target_vdu_wth_persistent_storage
3328 disk_list = [
3329 {
3330 "image_id": "ubuntu20.04",
3331 "size": "10",
3332 "keep": False,
3333 },
3334 ]
3335 expected_disk_list = [
3336 {
3337 "image_id": "ubuntu20.04",
3338 "size": "10",
3339 "keep": False,
3340 },
3341 {
3342 "vim_volume_id": vim_volume_id,
3343 },
3344 ]
3345 self.ns.find_persistent_volumes(
3346 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3347 )
3348 self.assertEqual(disk_list, expected_disk_list)
3349 mock_volume_keeping_required.assert_not_called()
3350
3351 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3352 def test_find_persistent_volumes_vdu_wthout_persistent_storage(
3353 self, mock_volume_keeping_required
3354 ):
3355 """Find persistent ordinary volume, there is not any persistent disk."""
3356 persistent_root_disk = {}
3357 vdu_instantiation_volumes_list = []
3358 mock_volume_keeping_required.return_value = False
3359 target_vdu = target_vdu_wthout_persistent_storage
3360 disk_list = []
3361 self.ns.find_persistent_volumes(
3362 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3363 )
3364 self.assertEqual(disk_list, disk_list)
3365 mock_volume_keeping_required.assert_not_called()
3366
3367 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3368 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3369 self, mock_volume_keeping_required
3370 ):
3371 """There is persistent root disk, but there is not ordinary persistent disk."""
3372 persistent_root_disk = {
3373 "persistent-root-volume": {
3374 "image_id": "ubuntu20.04",
3375 "size": "10",
3376 "keep": False,
3377 }
3378 }
3379 vdu_instantiation_volumes_list = []
3380 mock_volume_keeping_required.return_value = False
3381 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3382 target_vdu["virtual-storages"] = [
3383 {
3384 "id": "persistent-root-volume",
3385 "size-of-storage": "10",
3386 "type-of-storage": "persistent-storage:persistent-storage",
3387 "vdu-storage-requirements": [
3388 {"key": "keep-volume", "value": "true"},
3389 ],
3390 },
3391 {
3392 "id": "ephemeral-volume",
3393 "size-of-storage": "1",
3394 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3395 },
3396 ]
3397 disk_list = [
3398 {
3399 "image_id": "ubuntu20.04",
3400 "size": "10",
3401 "keep": False,
3402 },
3403 ]
3404 self.ns.find_persistent_volumes(
3405 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3406 )
3407 self.assertEqual(disk_list, disk_list)
3408 mock_volume_keeping_required.assert_not_called()
3409
3410 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3411 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(
3412 self, mock_volume_keeping_required
3413 ):
3414 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3415 vim-volume-id is given as instantiation parameter but disk id is not matching.
3416 """
3417 mock_volume_keeping_required.return_value = True
3418 vdu_instantiation_volumes_list = [
3419 {
3420 "vim-volume-id": vim_volume_id,
3421 "name": "persistent-volume3",
3422 }
3423 ]
3424 persistent_root_disk = {
3425 "persistent-root-volume": {
3426 "image_id": "ubuntu20.04",
3427 "size": "10",
3428 "keep": False,
3429 }
3430 }
3431 disk_list = [
3432 {
3433 "image_id": "ubuntu20.04",
3434 "size": "10",
3435 "keep": False,
3436 },
3437 ]
3438 expected_disk_list = [
3439 {
3440 "image_id": "ubuntu20.04",
3441 "size": "10",
3442 "keep": False,
3443 },
3444 {
3445 "size": "10",
3446 "keep": True,
3447 },
3448 ]
3449 expected_disk = {
3450 "id": "persistent-volume2",
3451 "size-of-storage": "10",
3452 "type-of-storage": "persistent-storage:persistent-storage",
3453 }
3454 target_vdu = target_vdu_wth_persistent_storage
3455 self.ns.find_persistent_volumes(
3456 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3457 )
3458 self.assertEqual(disk_list, expected_disk_list)
3459 mock_volume_keeping_required.assert_called_once_with(expected_disk)
3460
3461 def test_is_volume_keeping_required_true(self):
3462 """Volume keeping is required."""
3463 virtual_storage_descriptor = {
3464 "id": "persistent-root-volume",
3465 "type-of-storage": "persistent-storage:persistent-storage",
3466 "size-of-storage": "10",
3467 "vdu-storage-requirements": [
3468 {"key": "keep-volume", "value": "true"},
3469 ],
3470 }
3471 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3472 self.assertEqual(result, True)
3473
3474 def test_is_volume_keeping_required_false(self):
3475 """Volume keeping is not required."""
3476 virtual_storage_descriptor = {
3477 "id": "persistent-root-volume",
3478 "type-of-storage": "persistent-storage:persistent-storage",
3479 "size-of-storage": "10",
3480 "vdu-storage-requirements": [
3481 {"key": "keep-volume", "value": "false"},
3482 ],
3483 }
3484 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3485 self.assertEqual(result, False)
3486
3487 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self):
3488 """Volume keeping is not required, vdu-storage-requirements key does not exist."""
3489 virtual_storage_descriptor = {
3490 "id": "persistent-root-volume",
3491 "type-of-storage": "persistent-storage:persistent-storage",
3492 "size-of-storage": "10",
3493 }
3494 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3495 self.assertEqual(result, False)
3496
3497 def test_is_volume_keeping_required_wrong_keyword(self):
3498 """vdu-storage-requirements key to indicate keeping-volume is wrong."""
3499 virtual_storage_descriptor = {
3500 "id": "persistent-root-volume",
3501 "type-of-storage": "persistent-storage:persistent-storage",
3502 "size-of-storage": "10",
3503 "vdu-storage-requirements": [
3504 {"key": "hold-volume", "value": "true"},
3505 ],
3506 }
3507 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3508 self.assertEqual(result, False)
3509
3510 def test_sort_vdu_interfaces_position_all_wth_positions(self):
3511 """Interfaces are sorted according to position, all have positions."""
3512 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3513 target_vdu["interfaces"] = [
3514 {
3515 "name": "vdu-eth1",
3516 "ns-vld-id": "datanet",
3517 "position": 2,
3518 },
3519 {
3520 "name": "vdu-eth0",
3521 "ns-vld-id": "mgmtnet",
3522 "position": 1,
3523 },
3524 ]
3525 sorted_interfaces = [
3526 {
3527 "name": "vdu-eth0",
3528 "ns-vld-id": "mgmtnet",
3529 "position": 1,
3530 },
3531 {
3532 "name": "vdu-eth1",
3533 "ns-vld-id": "datanet",
3534 "position": 2,
3535 },
3536 ]
3537 self.ns._sort_vdu_interfaces(target_vdu)
3538 self.assertEqual(target_vdu["interfaces"], sorted_interfaces)
3539
3540 def test_sort_vdu_interfaces_position_some_wth_position(self):
3541 """Interfaces are sorted according to position, some of them have positions."""
3542 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3543 target_vdu["interfaces"] = [
3544 {
3545 "name": "vdu-eth0",
3546 "ns-vld-id": "mgmtnet",
3547 },
3548 {
3549 "name": "vdu-eth1",
3550 "ns-vld-id": "datanet",
3551 "position": 1,
3552 },
3553 ]
3554 sorted_interfaces = [
3555 {
3556 "name": "vdu-eth1",
3557 "ns-vld-id": "datanet",
3558 "position": 1,
3559 },
3560 {
3561 "name": "vdu-eth0",
3562 "ns-vld-id": "mgmtnet",
3563 },
3564 ]
3565 self.ns._sort_vdu_interfaces(target_vdu)
3566 self.assertEqual(target_vdu["interfaces"], sorted_interfaces)
3567
3568 def test_sort_vdu_interfaces_position_empty_interface_list(self):
3569 """Interface list is empty."""
3570 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3571 target_vdu["interfaces"] = []
3572 sorted_interfaces = []
3573 self.ns._sort_vdu_interfaces(target_vdu)
3574 self.assertEqual(target_vdu["interfaces"], sorted_interfaces)
3575
3576 def test_partially_locate_vdu_interfaces(self):
3577 """Some interfaces have positions."""
3578 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3579 target_vdu["interfaces"] = [
3580 {
3581 "name": "vdu-eth1",
3582 "ns-vld-id": "net1",
3583 },
3584 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3585 {
3586 "name": "vdu-eth3",
3587 "ns-vld-id": "mgmtnet",
3588 },
3589 {
3590 "name": "vdu-eth1",
3591 "ns-vld-id": "datanet",
3592 "position": 1,
3593 },
3594 ]
3595 self.ns._partially_locate_vdu_interfaces(target_vdu)
3596 self.assertDictEqual(
3597 target_vdu["interfaces"][0],
3598 {
3599 "name": "vdu-eth1",
3600 "ns-vld-id": "datanet",
3601 "position": 1,
3602 },
3603 )
3604 self.assertDictEqual(
3605 target_vdu["interfaces"][2],
3606 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3607 )
3608
3609 def test_partially_locate_vdu_interfaces_position_start_from_0(self):
3610 """Some interfaces have positions, position start from 0."""
3611 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3612 target_vdu["interfaces"] = [
3613 {
3614 "name": "vdu-eth1",
3615 "ns-vld-id": "net1",
3616 },
3617 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3618 {
3619 "name": "vdu-eth3",
3620 "ns-vld-id": "mgmtnet",
3621 },
3622 {
3623 "name": "vdu-eth1",
3624 "ns-vld-id": "datanet",
3625 "position": 0,
3626 },
3627 ]
3628 self.ns._partially_locate_vdu_interfaces(target_vdu)
3629 self.assertDictEqual(
3630 target_vdu["interfaces"][0],
3631 {
3632 "name": "vdu-eth1",
3633 "ns-vld-id": "datanet",
3634 "position": 0,
3635 },
3636 )
3637 self.assertDictEqual(
3638 target_vdu["interfaces"][3],
3639 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3640 )
3641
3642 def test_partially_locate_vdu_interfaces_wthout_position(self):
3643 """Interfaces do not have positions."""
3644 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3645 target_vdu["interfaces"] = interfaces_wthout_positions
3646 expected_result = deepcopy(target_vdu["interfaces"])
3647 self.ns._partially_locate_vdu_interfaces(target_vdu)
3648 self.assertEqual(target_vdu["interfaces"], expected_result)
3649
3650 def test_partially_locate_vdu_interfaces_all_has_position(self):
3651 """All interfaces have position."""
3652 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3653 target_vdu["interfaces"] = interfaces_wth_all_positions
3654 expected_interfaces = [
3655 {
3656 "name": "vdu-eth2",
3657 "ns-vld-id": "net2",
3658 "position": 0,
3659 },
3660 {
3661 "name": "vdu-eth3",
3662 "ns-vld-id": "mgmtnet",
3663 "position": 1,
3664 },
3665 {
3666 "name": "vdu-eth1",
3667 "ns-vld-id": "net1",
3668 "position": 2,
3669 },
3670 ]
3671 self.ns._partially_locate_vdu_interfaces(target_vdu)
3672 self.assertEqual(target_vdu["interfaces"], expected_interfaces)
3673
3674 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3675 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3676 def test_prepare_vdu_cloud_init(self, mock_parse_jinja2, mock_get_cloud_init):
3677 """Target_vdu has cloud-init and boot-data-drive."""
3678 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3679 target_vdu["cloud-init"] = "sample-cloud-init-path"
3680 target_vdu["boot-data-drive"] = "vda"
3681 vdu2cloud_init = {}
3682 mock_get_cloud_init.return_value = cloud_init_content
3683 mock_parse_jinja2.return_value = user_data
3684 expected_result = {
3685 "user-data": user_data,
3686 "boot-data-drive": "vda",
3687 }
3688 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3689 self.assertDictEqual(result, expected_result)
3690 mock_get_cloud_init.assert_called_once_with(
3691 db=db, fs=fs, location="sample-cloud-init-path"
3692 )
3693 mock_parse_jinja2.assert_called_once_with(
3694 cloud_init_content=cloud_init_content,
3695 params=None,
3696 context="sample-cloud-init-path",
3697 )
3698
3699 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3700 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3701 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3702 self, mock_parse_jinja2, mock_get_cloud_init
3703 ):
3704 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3705 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3706 target_vdu["cloud-init"] = "sample-cloud-init-path"
3707 target_vdu["boot-data-drive"] = "vda"
3708 vdu2cloud_init = {}
3709 mock_get_cloud_init.side_effect = NsException(
3710 "Mismatch descriptor for cloud init."
3711 )
3712
3713 with self.assertRaises(NsException) as err:
3714 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3715 self.assertEqual(str(err.exception), "Mismatch descriptor for cloud init.")
3716
3717 mock_get_cloud_init.assert_called_once_with(
3718 db=db, fs=fs, location="sample-cloud-init-path"
3719 )
3720 mock_parse_jinja2.assert_not_called()
3721
3722 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3723 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3724 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3725 self, mock_parse_jinja2, mock_get_cloud_init
3726 ):
3727 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3728 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3729 target_vdu["cloud-init"] = "sample-cloud-init-path"
3730 target_vdu["boot-data-drive"] = "vda"
3731 vdu2cloud_init = {}
3732 mock_get_cloud_init.return_value = cloud_init_content
3733 mock_parse_jinja2.side_effect = NsException("Error parsing cloud-init content.")
3734
3735 with self.assertRaises(NsException) as err:
3736 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3737 self.assertEqual(str(err.exception), "Error parsing cloud-init content.")
3738 mock_get_cloud_init.assert_called_once_with(
3739 db=db, fs=fs, location="sample-cloud-init-path"
3740 )
3741 mock_parse_jinja2.assert_called_once_with(
3742 cloud_init_content=cloud_init_content,
3743 params=None,
3744 context="sample-cloud-init-path",
3745 )
3746
3747 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3748 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3749 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
3750 self, mock_parse_jinja2, mock_get_cloud_init
3751 ):
3752 """Target_vdu has cloud-init but do not have boot-data-drive."""
3753 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3754 target_vdu["cloud-init"] = "sample-cloud-init-path"
3755 vdu2cloud_init = {}
3756 mock_get_cloud_init.return_value = cloud_init_content
3757 mock_parse_jinja2.return_value = user_data
3758 expected_result = {
3759 "user-data": user_data,
3760 }
3761 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3762 self.assertDictEqual(result, expected_result)
3763 mock_get_cloud_init.assert_called_once_with(
3764 db=db, fs=fs, location="sample-cloud-init-path"
3765 )
3766 mock_parse_jinja2.assert_called_once_with(
3767 cloud_init_content=cloud_init_content,
3768 params=None,
3769 context="sample-cloud-init-path",
3770 )
3771
3772 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3773 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3774 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
3775 self, mock_parse_jinja2, mock_get_cloud_init
3776 ):
3777 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
3778 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3779 target_vdu["cloud-init"] = "sample-cloud-init-path"
3780 target_vdu["boot-data-drive"] = "vda"
3781 vdu2cloud_init = {"sample-cloud-init-path": cloud_init_content}
3782 mock_parse_jinja2.return_value = user_data
3783 expected_result = {
3784 "user-data": user_data,
3785 "boot-data-drive": "vda",
3786 }
3787 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3788 self.assertDictEqual(result, expected_result)
3789 mock_get_cloud_init.assert_not_called()
3790 mock_parse_jinja2.assert_called_once_with(
3791 cloud_init_content=cloud_init_content,
3792 params=None,
3793 context="sample-cloud-init-path",
3794 )
3795
3796 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3797 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3798 def test_prepare_vdu_cloud_init_no_cloud_init(
3799 self, mock_parse_jinja2, mock_get_cloud_init
3800 ):
3801 """Target_vdu do not have cloud-init."""
3802 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3803 target_vdu["boot-data-drive"] = "vda"
3804 vdu2cloud_init = {}
3805 expected_result = {
3806 "boot-data-drive": "vda",
3807 }
3808 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3809 self.assertDictEqual(result, expected_result)
3810 mock_get_cloud_init.assert_not_called()
3811 mock_parse_jinja2.assert_not_called()
3812
3813 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self):
3814 """ns_vld and vnf_vld both exist."""
3815 interface = {
3816 "name": "vdu-eth0",
3817 "ns-vld-id": "mgmtnet",
3818 "vnf-vld-id": "mgmt_cp_int",
3819 }
3820 expected_result = f"{ns_preffix}:vld.mgmtnet"
3821 result = self.ns._check_vld_information_of_interfaces(
3822 interface, ns_preffix, vnf_preffix
3823 )
3824 self.assertEqual(result, expected_result)
3825
3826 def test_check_vld_information_of_interfaces_empty_interfaces(self):
3827 """Interface dict is empty."""
3828 interface = {}
3829 result = self.ns._check_vld_information_of_interfaces(
3830 interface, ns_preffix, vnf_preffix
3831 )
3832 self.assertEqual(result, "")
3833
3834 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self):
3835 """Interface dict has only vnf_vld."""
3836 interface = {
3837 "name": "vdu-eth0",
3838 "vnf-vld-id": "mgmt_cp_int",
3839 }
3840 expected_result = f"{vnf_preffix}:vld.mgmt_cp_int"
3841 result = self.ns._check_vld_information_of_interfaces(
3842 interface, ns_preffix, vnf_preffix
3843 )
3844 self.assertEqual(result, expected_result)
3845
3846 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
3847 self,
3848 ):
3849 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
3850 interface = {
3851 "name": "vdu-eth0",
3852 "vnf-vld-id": "mgmt_cp_int",
3853 }
3854 vnf_preffix = None
3855 with self.assertRaises(Exception) as err:
3856 self.ns._check_vld_information_of_interfaces(
3857 interface, ns_preffix, vnf_preffix
3858 )
3859 self.assertEqual(type(err), TypeError)
3860
3861 def test_prepare_interface_port_security_has_security_details(self):
3862 """Interface dict has port security details."""
3863 interface = {
3864 "name": "vdu-eth0",
3865 "ns-vld-id": "mgmtnet",
3866 "vnf-vld-id": "mgmt_cp_int",
3867 "port-security-enabled": True,
3868 "port-security-disable-strategy": "allow-address-pairs",
3869 }
3870 expected_interface = {
3871 "name": "vdu-eth0",
3872 "ns-vld-id": "mgmtnet",
3873 "vnf-vld-id": "mgmt_cp_int",
3874 "port_security": True,
3875 "port_security_disable_strategy": "allow-address-pairs",
3876 }
3877 self.ns._prepare_interface_port_security(interface)
3878 self.assertDictEqual(interface, expected_interface)
3879
3880 def test_prepare_interface_port_security_empty_interfaces(self):
3881 """Interface dict is empty."""
3882 interface = {}
3883 expected_interface = {}
3884 self.ns._prepare_interface_port_security(interface)
3885 self.assertDictEqual(interface, expected_interface)
3886
3887 def test_prepare_interface_port_security_wthout_port_security(self):
3888 """Interface dict does not have port security details."""
3889 interface = {
3890 "name": "vdu-eth0",
3891 "ns-vld-id": "mgmtnet",
3892 "vnf-vld-id": "mgmt_cp_int",
3893 }
3894 expected_interface = {
3895 "name": "vdu-eth0",
3896 "ns-vld-id": "mgmtnet",
3897 "vnf-vld-id": "mgmt_cp_int",
3898 }
3899 self.ns._prepare_interface_port_security(interface)
3900 self.assertDictEqual(interface, expected_interface)
3901
3902 def test_create_net_item_of_interface_floating_ip_port_security(self):
3903 """Interface dict has floating ip, port-security details."""
3904 interface = {
3905 "name": "vdu-eth0",
3906 "vcpi": "sample_vcpi",
3907 "port_security": True,
3908 "port_security_disable_strategy": "allow-address-pairs",
3909 "floating_ip": "10.1.1.12",
3910 "ns-vld-id": "mgmtnet",
3911 "vnf-vld-id": "mgmt_cp_int",
3912 }
3913 net_text = f"{ns_preffix}"
3914 expected_net_item = {
3915 "name": "vdu-eth0",
3916 "port_security": True,
3917 "port_security_disable_strategy": "allow-address-pairs",
3918 "floating_ip": "10.1.1.12",
3919 "net_id": f"TASK-{ns_preffix}",
3920 "type": "virtual",
3921 }
3922 result = self.ns._create_net_item_of_interface(interface, net_text)
3923 self.assertDictEqual(result, expected_net_item)
3924
3925 def test_create_net_item_of_interface_invalid_net_text(self):
3926 """net-text is invalid."""
3927 interface = {
3928 "name": "vdu-eth0",
3929 "vcpi": "sample_vcpi",
3930 "port_security": True,
3931 "port_security_disable_strategy": "allow-address-pairs",
3932 "floating_ip": "10.1.1.12",
3933 "ns-vld-id": "mgmtnet",
3934 "vnf-vld-id": "mgmt_cp_int",
3935 }
3936 net_text = None
3937 with self.assertRaises(TypeError):
3938 self.ns._create_net_item_of_interface(interface, net_text)
3939
3940 def test_create_net_item_of_interface_empty_interface(self):
3941 """Interface dict is empty."""
3942 interface = {}
3943 net_text = ns_preffix
3944 expected_net_item = {
3945 "net_id": f"TASK-{ns_preffix}",
3946 "type": "virtual",
3947 }
3948 result = self.ns._create_net_item_of_interface(interface, net_text)
3949 self.assertDictEqual(result, expected_net_item)
3950
3951 @patch("osm_ng_ro.ns.deep_get")
3952 def test_prepare_type_of_interface_type_sriov(self, mock_deep_get):
3953 """Interface type is SR-IOV."""
3954 interface = {
3955 "name": "vdu-eth0",
3956 "vcpi": "sample_vcpi",
3957 "port_security": True,
3958 "port_security_disable_strategy": "allow-address-pairs",
3959 "floating_ip": "10.1.1.12",
3960 "ns-vld-id": "mgmtnet",
3961 "vnf-vld-id": "mgmt_cp_int",
3962 "type": "SR-IOV",
3963 }
3964 mock_deep_get.return_value = "SR-IOV"
3965 net_text = ns_preffix
3966 net_item = {}
3967 expected_net_item = {
3968 "use": "data",
3969 "model": "SR-IOV",
3970 "type": "SR-IOV",
3971 }
3972 self.ns._prepare_type_of_interface(
3973 interface, tasks_by_target_record_id, net_text, net_item
3974 )
3975 self.assertDictEqual(net_item, expected_net_item)
3976 self.assertEqual(
3977 "data",
3978 tasks_by_target_record_id[net_text]["extra_dict"]["params"]["net_type"],
3979 )
3980 mock_deep_get.assert_called_once_with(
3981 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type"
3982 )
3983
3984 @patch("osm_ng_ro.ns.deep_get")
3985 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
3986 self, mock_deep_get
3987 ):
3988 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
3989 interface = {
3990 "name": "vdu-eth0",
3991 "vcpi": "sample_vcpi",
3992 "port_security": True,
3993 "port_security_disable_strategy": "allow-address-pairs",
3994 "floating_ip": "10.1.1.12",
3995 "ns-vld-id": "mgmtnet",
3996 "vnf-vld-id": "mgmt_cp_int",
3997 "type": "PCI-PASSTHROUGH",
3998 }
3999 mock_deep_get.return_value = {}
4000 tasks_by_target_record_id = {}
4001 net_text = ns_preffix
4002 net_item = {}
4003 expected_net_item = {
4004 "use": "data",
4005 "model": "PCI-PASSTHROUGH",
4006 "type": "PCI-PASSTHROUGH",
4007 }
4008 self.ns._prepare_type_of_interface(
4009 interface, tasks_by_target_record_id, net_text, net_item
4010 )
4011 self.assertDictEqual(net_item, expected_net_item)
4012 mock_deep_get.assert_called_once_with(
4013 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type"
4014 )
4015
4016 @patch("osm_ng_ro.ns.deep_get")
4017 def test_prepare_type_of_interface_type_mgmt(self, mock_deep_get):
4018 """Interface type is mgmt."""
4019 interface = {
4020 "name": "vdu-eth0",
4021 "vcpi": "sample_vcpi",
4022 "port_security": True,
4023 "port_security_disable_strategy": "allow-address-pairs",
4024 "floating_ip": "10.1.1.12",
4025 "ns-vld-id": "mgmtnet",
4026 "vnf-vld-id": "mgmt_cp_int",
4027 "type": "OM-MGMT",
4028 }
4029 tasks_by_target_record_id = {}
4030 net_text = ns_preffix
4031 net_item = {}
4032 expected_net_item = {
4033 "use": "mgmt",
4034 }
4035 self.ns._prepare_type_of_interface(
4036 interface, tasks_by_target_record_id, net_text, net_item
4037 )
4038 self.assertDictEqual(net_item, expected_net_item)
4039 mock_deep_get.assert_not_called()
4040
4041 @patch("osm_ng_ro.ns.deep_get")
4042 def test_prepare_type_of_interface_type_bridge(self, mock_deep_get):
4043 """Interface type is bridge."""
4044 interface = {
4045 "name": "vdu-eth0",
4046 "vcpi": "sample_vcpi",
4047 "port_security": True,
4048 "port_security_disable_strategy": "allow-address-pairs",
4049 "floating_ip": "10.1.1.12",
4050 "ns-vld-id": "mgmtnet",
4051 "vnf-vld-id": "mgmt_cp_int",
4052 }
4053 tasks_by_target_record_id = {}
4054 net_text = ns_preffix
4055 net_item = {}
4056 expected_net_item = {
4057 "use": "bridge",
4058 "model": None,
4059 }
4060 self.ns._prepare_type_of_interface(
4061 interface, tasks_by_target_record_id, net_text, net_item
4062 )
4063 self.assertDictEqual(net_item, expected_net_item)
4064 mock_deep_get.assert_not_called()
4065
4066 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4067 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4068 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4069 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4070 def test_prepare_vdu_interfaces(
4071 self,
4072 mock_type_of_interface,
4073 mock_item_of_interface,
4074 mock_port_security,
4075 mock_vld_information_of_interface,
4076 ):
4077 """Prepare vdu interfaces successfully."""
4078 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4079 interface_1 = {
4080 "name": "vdu-eth1",
4081 "ns-vld-id": "net1",
4082 "ip-address": "13.2.12.31",
4083 "mgmt-interface": True,
4084 }
4085 interface_2 = {
4086 "name": "vdu-eth2",
4087 "vnf-vld-id": "net2",
4088 "mac-address": "d0:94:66:ed:fc:e2",
4089 }
4090 interface_3 = {
4091 "name": "vdu-eth3",
4092 "ns-vld-id": "mgmtnet",
4093 }
4094 target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
4095 extra_dict = {
4096 "params": "test_params",
4097 "find_params": "test_find_params",
4098 "depends_on": [],
4099 }
4100
4101 net_text_1 = f"{ns_preffix}:net1"
4102 net_text_2 = f"{vnf_preffix}:net2"
4103 net_text_3 = f"{ns_preffix}:mgmtnet"
4104 net_item_1 = {
4105 "name": "vdu-eth1",
4106 "net_id": f"TASK-{ns_preffix}",
4107 "type": "virtual",
4108 }
4109 net_item_2 = {
4110 "name": "vdu-eth2",
4111 "net_id": f"TASK-{ns_preffix}",
4112 "type": "virtual",
4113 }
4114 net_item_3 = {
4115 "name": "vdu-eth3",
4116 "net_id": f"TASK-{ns_preffix}",
4117 "type": "virtual",
4118 }
4119 mock_item_of_interface.side_effect = [net_item_1, net_item_2, net_item_3]
4120 mock_vld_information_of_interface.side_effect = [
4121 net_text_1,
4122 net_text_2,
4123 net_text_3,
4124 ]
4125 net_list = []
4126 expected_extra_dict = {
4127 "params": "test_params",
4128 "find_params": "test_find_params",
4129 "depends_on": [net_text_1, net_text_2, net_text_3],
4130 "mgmt_vdu_interface": 0,
4131 }
4132 updated_net_item1 = deepcopy(net_item_1)
4133 updated_net_item1.update({"ip_address": "13.2.12.31"})
4134 updated_net_item2 = deepcopy(net_item_2)
4135 updated_net_item2.update({"mac_address": "d0:94:66:ed:fc:e2"})
4136 expected_net_list = [updated_net_item1, updated_net_item2, net_item_3]
4137 self.ns._prepare_vdu_interfaces(
4138 target_vdu,
4139 extra_dict,
4140 ns_preffix,
4141 vnf_preffix,
4142 self.logger,
4143 tasks_by_target_record_id,
4144 net_list,
4145 )
4146 _call_mock_vld_information_of_interface = (
4147 mock_vld_information_of_interface.call_args_list
4148 )
4149 self.assertEqual(
4150 _call_mock_vld_information_of_interface[0][0],
4151 (interface_1, ns_preffix, vnf_preffix),
4152 )
4153 self.assertEqual(
4154 _call_mock_vld_information_of_interface[1][0],
4155 (interface_2, ns_preffix, vnf_preffix),
4156 )
4157 self.assertEqual(
4158 _call_mock_vld_information_of_interface[2][0],
4159 (interface_3, ns_preffix, vnf_preffix),
4160 )
4161
4162 _call_mock_port_security = mock_port_security.call_args_list
4163 self.assertEqual(_call_mock_port_security[0].args[0], interface_1)
4164 self.assertEqual(_call_mock_port_security[1].args[0], interface_2)
4165 self.assertEqual(_call_mock_port_security[2].args[0], interface_3)
4166
4167 _call_mock_item_of_interface = mock_item_of_interface.call_args_list
4168 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1))
4169 self.assertEqual(_call_mock_item_of_interface[1][0], (interface_2, net_text_2))
4170 self.assertEqual(_call_mock_item_of_interface[2][0], (interface_3, net_text_3))
4171
4172 _call_mock_type_of_interface = mock_type_of_interface.call_args_list
4173 self.assertEqual(
4174 _call_mock_type_of_interface[0][0],
4175 (interface_1, tasks_by_target_record_id, net_text_1, net_item_1),
4176 )
4177 self.assertEqual(
4178 _call_mock_type_of_interface[1][0],
4179 (interface_2, tasks_by_target_record_id, net_text_2, net_item_2),
4180 )
4181 self.assertEqual(
4182 _call_mock_type_of_interface[2][0],
4183 (interface_3, tasks_by_target_record_id, net_text_3, net_item_3),
4184 )
4185 self.assertEqual(net_list, expected_net_list)
4186 self.assertEqual(extra_dict, expected_extra_dict)
4187 self.logger.error.assert_not_called()
4188
4189 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4190 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4191 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4192 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4193 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4194 self,
4195 mock_type_of_interface,
4196 mock_item_of_interface,
4197 mock_port_security,
4198 mock_vld_information_of_interface,
4199 ):
4200 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4201 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4202 interface_1 = {
4203 "name": "vdu-eth1",
4204 "ns-vld-id": "net1",
4205 "ip-address": "13.2.12.31",
4206 "mgmt-interface": True,
4207 }
4208 interface_2 = {
4209 "name": "vdu-eth2",
4210 "vnf-vld-id": "net2",
4211 "mac-address": "d0:94:66:ed:fc:e2",
4212 }
4213 interface_3 = {
4214 "name": "vdu-eth3",
4215 "ns-vld-id": "mgmtnet",
4216 }
4217 target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
4218 extra_dict = {
4219 "params": "test_params",
4220 "find_params": "test_find_params",
4221 "depends_on": [],
4222 }
4223 net_text_1 = f"{ns_preffix}:net1"
4224 mock_item_of_interface.side_effect = [TypeError, TypeError, TypeError]
4225
4226 mock_vld_information_of_interface.side_effect = [net_text_1]
4227 net_list = []
4228 expected_extra_dict = {
4229 "params": "test_params",
4230 "find_params": "test_find_params",
4231 "depends_on": [net_text_1],
4232 }
4233 with self.assertRaises(TypeError):
4234 self.ns._prepare_vdu_interfaces(
4235 target_vdu,
4236 extra_dict,
4237 ns_preffix,
4238 vnf_preffix,
4239 self.logger,
4240 tasks_by_target_record_id,
4241 net_list,
4242 )
4243
4244 _call_mock_vld_information_of_interface = (
4245 mock_vld_information_of_interface.call_args_list
4246 )
4247 self.assertEqual(
4248 _call_mock_vld_information_of_interface[0][0],
4249 (interface_1, ns_preffix, vnf_preffix),
4250 )
4251
4252 _call_mock_port_security = mock_port_security.call_args_list
4253 self.assertEqual(_call_mock_port_security[0].args[0], interface_1)
4254
4255 _call_mock_item_of_interface = mock_item_of_interface.call_args_list
4256 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1))
4257
4258 mock_type_of_interface.assert_not_called()
4259 self.logger.error.assert_not_called()
4260 self.assertEqual(net_list, [])
4261 self.assertEqual(extra_dict, expected_extra_dict)
4262
4263 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4264 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4265 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4266 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4267 def test_prepare_vdu_interfaces_vld_information_is_empty(
4268 self,
4269 mock_type_of_interface,
4270 mock_item_of_interface,
4271 mock_port_security,
4272 mock_vld_information_of_interface,
4273 ):
4274 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4275 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4276 interface_1 = {
4277 "name": "vdu-eth1",
4278 "ns-vld-id": "net1",
4279 "ip-address": "13.2.12.31",
4280 "mgmt-interface": True,
4281 }
4282 interface_2 = {
4283 "name": "vdu-eth2",
4284 "vnf-vld-id": "net2",
4285 "mac-address": "d0:94:66:ed:fc:e2",
4286 }
4287 interface_3 = {
4288 "name": "vdu-eth3",
4289 "ns-vld-id": "mgmtnet",
4290 }
4291 target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
4292 extra_dict = {
4293 "params": "test_params",
4294 "find_params": "test_find_params",
4295 "depends_on": [],
4296 }
4297 mock_vld_information_of_interface.side_effect = ["", "", ""]
4298 net_list = []
4299 self.ns._prepare_vdu_interfaces(
4300 target_vdu,
4301 extra_dict,
4302 ns_preffix,
4303 vnf_preffix,
4304 self.logger,
4305 tasks_by_target_record_id,
4306 net_list,
4307 )
4308
4309 _call_mock_vld_information_of_interface = (
4310 mock_vld_information_of_interface.call_args_list
4311 )
4312 self.assertEqual(
4313 _call_mock_vld_information_of_interface[0][0],
4314 (interface_1, ns_preffix, vnf_preffix),
4315 )
4316 self.assertEqual(
4317 _call_mock_vld_information_of_interface[1][0],
4318 (interface_2, ns_preffix, vnf_preffix),
4319 )
4320 self.assertEqual(
4321 _call_mock_vld_information_of_interface[2][0],
4322 (interface_3, ns_preffix, vnf_preffix),
4323 )
4324
4325 _call_logger = self.logger.error.call_args_list
4326 self.assertEqual(
4327 _call_logger[0][0],
4328 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4329 )
4330 self.assertEqual(
4331 _call_logger[1][0],
4332 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4333 )
4334 self.assertEqual(
4335 _call_logger[2][0],
4336 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4337 )
4338 self.assertEqual(net_list, [])
4339 self.assertEqual(
4340 extra_dict,
4341 {
4342 "params": "test_params",
4343 "find_params": "test_find_params",
4344 "depends_on": [],
4345 },
4346 )
4347
4348 mock_item_of_interface.assert_not_called()
4349 mock_port_security.assert_not_called()
4350 mock_type_of_interface.assert_not_called()
4351
4352 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4353 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4354 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4355 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4356 def test_prepare_vdu_interfaces_empty_interface_list(
4357 self,
4358 mock_type_of_interface,
4359 mock_item_of_interface,
4360 mock_port_security,
4361 mock_vld_information_of_interface,
4362 ):
4363 """Prepare vdu interfaces, interface list is empty."""
4364 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4365 target_vdu["interfaces"] = []
4366 extra_dict = {}
4367 net_list = []
4368 self.ns._prepare_vdu_interfaces(
4369 target_vdu,
4370 extra_dict,
4371 ns_preffix,
4372 vnf_preffix,
4373 self.logger,
4374 tasks_by_target_record_id,
4375 net_list,
4376 )
4377 mock_type_of_interface.assert_not_called()
4378 mock_vld_information_of_interface.assert_not_called()
4379 mock_item_of_interface.assert_not_called()
4380 mock_port_security.assert_not_called()
4381
4382 def test_prepare_vdu_ssh_keys(self):
4383 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4384 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4385 target_vdu["ssh-keys"] = ["sample-ssh-key"]
4386 ro_nsr_public_key = {"public_key": "path_of_public_key"}
4387 target_vdu["ssh-access-required"] = True
4388 cloud_config = {}
4389 expected_cloud_config = {
4390 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4391 }
4392 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config)
4393 self.assertDictEqual(cloud_config, expected_cloud_config)
4394
4395 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self):
4396 """Target_vdu does not have ssh-keys."""
4397 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4398 ro_nsr_public_key = {"public_key": "path_of_public_key"}
4399 target_vdu["ssh-access-required"] = True
4400 cloud_config = {}
4401 expected_cloud_config = {"key-pairs": [{"public_key": "path_of_public_key"}]}
4402 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config)
4403 self.assertDictEqual(cloud_config, expected_cloud_config)
4404
4405 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self):
4406 """Target_vdu has ssh-keys, ssh-access is not required."""
4407 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4408 target_vdu["ssh-keys"] = ["sample-ssh-key"]
4409 ro_nsr_public_key = {"public_key": "path_of_public_key"}
4410 target_vdu["ssh-access-required"] = False
4411 cloud_config = {}
4412 expected_cloud_config = {"key-pairs": ["sample-ssh-key"]}
4413 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config)
4414 self.assertDictEqual(cloud_config, expected_cloud_config)
4415
4416 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4417 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4418 def test_add_persistent_root_disk_to_disk_list_keep_false(
4419 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4420 ):
4421 """Add persistent root disk to disk_list, keep volume set to False."""
4422 root_disk = {
4423 "id": "persistent-root-volume",
4424 "type-of-storage": "persistent-storage:persistent-storage",
4425 "size-of-storage": "10",
4426 }
4427 mock_select_persistent_root_disk.return_value = root_disk
4428 vnfd = deepcopy(vnfd_wth_persistent_storage)
4429 vnfd["virtual-storage-desc"][1] = root_disk
4430 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4431 persistent_root_disk = {}
4432 disk_list = []
4433 mock_volume_keeping_required.return_value = False
4434 expected_disk_list = [
4435 {
4436 "image_id": "ubuntu20.04",
4437 "size": "10",
4438 "keep": False,
4439 }
4440 ]
4441 self.ns._add_persistent_root_disk_to_disk_list(
4442 vnfd, target_vdu, persistent_root_disk, disk_list
4443 )
4444 self.assertEqual(disk_list, expected_disk_list)
4445 mock_select_persistent_root_disk.assert_called_once()
4446 mock_volume_keeping_required.assert_called_once()
4447
4448 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4449 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4450 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4451 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4452 ):
4453 """Add persistent root disk to disk_list"""
4454 root_disk = {
4455 "id": "persistent-root-volume",
4456 "type-of-storage": "persistent-storage:persistent-storage",
4457 "size-of-storage": "10",
4458 }
4459 mock_select_persistent_root_disk.side_effect = AttributeError
4460 vnfd = deepcopy(vnfd_wth_persistent_storage)
4461 vnfd["virtual-storage-desc"][1] = root_disk
4462 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4463 persistent_root_disk = {}
4464 disk_list = []
4465 with self.assertRaises(AttributeError):
4466 self.ns._add_persistent_root_disk_to_disk_list(
4467 vnfd, target_vdu, persistent_root_disk, disk_list
4468 )
4469 self.assertEqual(disk_list, [])
4470 mock_select_persistent_root_disk.assert_called_once()
4471 mock_volume_keeping_required.assert_not_called()
4472
4473 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4474 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4475 def test_add_persistent_root_disk_to_disk_list_keep_true(
4476 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4477 ):
4478 """Add persistent root disk, keeo volume set to True."""
4479 vnfd = deepcopy(vnfd_wth_persistent_storage)
4480 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4481 mock_volume_keeping_required.return_value = True
4482 root_disk = {
4483 "id": "persistent-root-volume",
4484 "type-of-storage": "persistent-storage:persistent-storage",
4485 "size-of-storage": "10",
4486 "vdu-storage-requirements": [
4487 {"key": "keep-volume", "value": "true"},
4488 ],
4489 }
4490 mock_select_persistent_root_disk.return_value = root_disk
4491 persistent_root_disk = {}
4492 disk_list = []
4493 expected_disk_list = [
4494 {
4495 "image_id": "ubuntu20.04",
4496 "size": "10",
4497 "keep": True,
4498 }
4499 ]
4500 self.ns._add_persistent_root_disk_to_disk_list(
4501 vnfd, target_vdu, persistent_root_disk, disk_list
4502 )
4503 self.assertEqual(disk_list, expected_disk_list)
4504 mock_volume_keeping_required.assert_called_once_with(root_disk)
4505
4506 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4507 def test_add_persistent_ordinary_disk_to_disk_list(
4508 self, mock_volume_keeping_required
4509 ):
4510 """Add persistent ordinary disk, keeo volume set to True."""
4511 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4512 mock_volume_keeping_required.return_value = False
4513 persistent_root_disk = {
4514 "persistent-root-volume": {
4515 "image_id": "ubuntu20.04",
4516 "size": "10",
4517 "keep": True,
4518 }
4519 }
4520 ordinary_disk = {
4521 "id": "persistent-volume2",
4522 "type-of-storage": "persistent-storage:persistent-storage",
4523 "size-of-storage": "10",
4524 }
4525 persistent_ordinary_disk = {}
4526 disk_list = []
4527 expected_disk_list = [
4528 {
4529 "size": "10",
4530 "keep": False,
4531 }
4532 ]
4533 self.ns._add_persistent_ordinary_disks_to_disk_list(
4534 target_vdu, persistent_root_disk, persistent_ordinary_disk, disk_list
4535 )
4536 self.assertEqual(disk_list, expected_disk_list)
4537 mock_volume_keeping_required.assert_called_once_with(ordinary_disk)
4538
4539 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4540 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(
4541 self, mock_volume_keeping_required
4542 ):
4543 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4544 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4545 mock_volume_keeping_required.return_value = False
4546 persistent_root_disk = {
4547 "persistent-root-volume": {
4548 "image_id": "ubuntu20.04",
4549 "size": "10",
4550 "keep": True,
4551 },
4552 "persistent-volume2": {
4553 "size": "10",
4554 },
4555 }
4556 persistent_ordinary_disk = {}
4557 disk_list = []
4558
4559 self.ns._add_persistent_ordinary_disks_to_disk_list(
4560 target_vdu, persistent_root_disk, persistent_ordinary_disk, disk_list
4561 )
4562 self.assertEqual(disk_list, [])
4563 mock_volume_keeping_required.assert_not_called()
4564
4565 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4566 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4567 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4568 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4569 ):
4570 """VNFD does not have persistent storage."""
4571 vnfd = deepcopy(vnfd_wthout_persistent_storage)
4572 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4573 mock_select_persistent_root_disk.return_value = None
4574 persistent_root_disk = {}
4575 disk_list = []
4576 self.ns._add_persistent_root_disk_to_disk_list(
4577 vnfd, target_vdu, persistent_root_disk, disk_list
4578 )
4579 self.assertEqual(disk_list, [])
4580 self.assertEqual(mock_select_persistent_root_disk.call_count, 2)
4581 mock_volume_keeping_required.assert_not_called()
4582
4583 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4584 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4585 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4586 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4587 ):
4588 """Persistent_root_disk dict is empty."""
4589 vnfd = deepcopy(vnfd_wthout_persistent_storage)
4590 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4591 mock_select_persistent_root_disk.return_value = None
4592 persistent_root_disk = {}
4593 disk_list = []
4594 self.ns._add_persistent_root_disk_to_disk_list(
4595 vnfd, target_vdu, persistent_root_disk, disk_list
4596 )
4597 self.assertEqual(disk_list, [])
4598 self.assertEqual(mock_select_persistent_root_disk.call_count, 2)
4599 mock_volume_keeping_required.assert_not_called()
4600
4601 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self):
4602 """Invalid extra dict."""
4603 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4604 target_vdu["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4605 extra_dict = {}
4606 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4607 with self.assertRaises(NsException) as err:
4608 self.ns._prepare_vdu_affinity_group_list(target_vdu, extra_dict, ns_preffix)
4609 self.assertEqual(str(err.exception), "Invalid extra_dict format.")
4610
4611 def test_prepare_vdu_affinity_group_list_one_affinity_group(self):
4612 """There is one affinity-group."""
4613 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4614 target_vdu["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4615 extra_dict = {"depends_on": []}
4616 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4617 affinity_group_txt = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4618 expected_result = [{"affinity_group_id": "TASK-" + affinity_group_txt}]
4619 expected_extra_dict = {"depends_on": [affinity_group_txt]}
4620 result = self.ns._prepare_vdu_affinity_group_list(
4621 target_vdu, extra_dict, ns_preffix
4622 )
4623 self.assertDictEqual(extra_dict, expected_extra_dict)
4624 self.assertEqual(result, expected_result)
4625
4626 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self):
4627 """There are two affinity-groups."""
4628 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4629 target_vdu["affinity-or-anti-affinity-group-id"] = [
4630 "affinity-group-id1",
4631 "affinity-group-id2",
4632 ]
4633 extra_dict = {"depends_on": []}
4634 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4635 affinity_group_txt1 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4636 affinity_group_txt2 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4637 expected_result = [
4638 {"affinity_group_id": "TASK-" + affinity_group_txt1},
4639 {"affinity_group_id": "TASK-" + affinity_group_txt2},
4640 ]
4641 expected_extra_dict = {"depends_on": [affinity_group_txt1, affinity_group_txt2]}
4642 result = self.ns._prepare_vdu_affinity_group_list(
4643 target_vdu, extra_dict, ns_preffix
4644 )
4645 self.assertDictEqual(extra_dict, expected_extra_dict)
4646 self.assertEqual(result, expected_result)
4647
4648 def test_prepare_vdu_affinity_group_list_no_affinity_group(self):
4649 """There is not any affinity-group."""
4650 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4651 extra_dict = {"depends_on": []}
4652 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4653 result = self.ns._prepare_vdu_affinity_group_list(
4654 target_vdu, extra_dict, ns_preffix
4655 )
4656 self.assertDictEqual(extra_dict, {"depends_on": []})
4657 self.assertEqual(result, [])
4658
4659 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4660 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4661 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4662 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4663 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4664 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4665 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4666 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4667 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4668 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4669 def test_process_vdu_params_with_inst_vol_list(
4670 self,
4671 mock_prepare_vdu_affinity_group_list,
4672 mock_add_persistent_ordinary_disks_to_disk_list,
4673 mock_add_persistent_root_disk_to_disk_list,
4674 mock_find_persistent_volumes,
4675 mock_find_persistent_root_volumes,
4676 mock_prepare_vdu_ssh_keys,
4677 mock_prepare_vdu_cloud_init,
4678 mock_prepare_vdu_interfaces,
4679 mock_locate_vdu_interfaces,
4680 mock_sort_vdu_interfaces,
4681 ):
4682 """Instantiation volume list is empty."""
4683 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4684
4685 target_vdu["interfaces"] = interfaces_wth_all_positions
4686
4687 vdu_instantiation_vol_list = [
4688 {
4689 "vim-volume-id": vim_volume_id,
4690 "name": "persistent-volume2",
4691 }
4692 ]
4693 target_vdu["additionalParams"] = {
4694 "OSM": {"vdu_volumes": vdu_instantiation_vol_list}
4695 }
4696 mock_prepare_vdu_cloud_init.return_value = {}
4697 mock_prepare_vdu_affinity_group_list.return_value = []
4698 persistent_root_disk = {
4699 "persistent-root-volume": {
4700 "image_id": "ubuntu20.04",
4701 "size": "10",
4702 }
4703 }
4704 mock_find_persistent_root_volumes.return_value = persistent_root_disk
4705
4706 new_kwargs = deepcopy(kwargs)
4707 new_kwargs.update(
4708 {
4709 "vnfr_id": vnfr_id,
4710 "nsr_id": nsr_id,
4711 "tasks_by_target_record_id": {},
4712 "logger": "logger",
4713 }
4714 )
4715 expected_extra_dict_copy = deepcopy(expected_extra_dict)
4716 vnfd = deepcopy(vnfd_wth_persistent_storage)
4717 db.get_one.return_value = vnfd
4718 result = Ns._process_vdu_params(
4719 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
4720 )
4721 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
4722 mock_locate_vdu_interfaces.assert_not_called()
4723 mock_prepare_vdu_cloud_init.assert_called_once()
4724 mock_add_persistent_root_disk_to_disk_list.assert_not_called()
4725 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
4726 mock_prepare_vdu_interfaces.assert_called_once_with(
4727 target_vdu,
4728 expected_extra_dict_copy,
4729 ns_preffix,
4730 vnf_preffix,
4731 "logger",
4732 {},
4733 [],
4734 )
4735 self.assertDictEqual(result, expected_extra_dict_copy)
4736 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
4737 mock_prepare_vdu_affinity_group_list.assert_called_once()
4738 mock_find_persistent_volumes.assert_called_once_with(
4739 persistent_root_disk, target_vdu, vdu_instantiation_vol_list, []
4740 )
4741
4742 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4743 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4744 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4745 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4746 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4747 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4748 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4749 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4750 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4751 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4752 def test_process_vdu_params_wth_affinity_groups(
4753 self,
4754 mock_prepare_vdu_affinity_group_list,
4755 mock_add_persistent_ordinary_disks_to_disk_list,
4756 mock_add_persistent_root_disk_to_disk_list,
4757 mock_find_persistent_volumes,
4758 mock_find_persistent_root_volumes,
4759 mock_prepare_vdu_ssh_keys,
4760 mock_prepare_vdu_cloud_init,
4761 mock_prepare_vdu_interfaces,
4762 mock_locate_vdu_interfaces,
4763 mock_sort_vdu_interfaces,
4764 ):
4765 """There is cloud-config."""
4766 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4767
4768 self.maxDiff = None
4769 target_vdu["interfaces"] = interfaces_wth_all_positions
4770 mock_prepare_vdu_cloud_init.return_value = {}
4771 mock_prepare_vdu_affinity_group_list.return_value = [
4772 "affinity_group_1",
4773 "affinity_group_2",
4774 ]
4775
4776 new_kwargs = deepcopy(kwargs)
4777 new_kwargs.update(
4778 {
4779 "vnfr_id": vnfr_id,
4780 "nsr_id": nsr_id,
4781 "tasks_by_target_record_id": {},
4782 "logger": "logger",
4783 }
4784 )
4785 expected_extra_dict3 = deepcopy(expected_extra_dict2)
4786 expected_extra_dict3["params"]["affinity_group_list"] = [
4787 "affinity_group_1",
4788 "affinity_group_2",
4789 ]
4790 vnfd = deepcopy(vnfd_wth_persistent_storage)
4791 db.get_one.return_value = vnfd
4792 result = Ns._process_vdu_params(
4793 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
4794 )
4795 self.assertDictEqual(result, expected_extra_dict3)
4796 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
4797 mock_locate_vdu_interfaces.assert_not_called()
4798 mock_prepare_vdu_cloud_init.assert_called_once()
4799 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
4800 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
4801 mock_prepare_vdu_interfaces.assert_called_once_with(
4802 target_vdu,
4803 expected_extra_dict3,
4804 ns_preffix,
4805 vnf_preffix,
4806 "logger",
4807 {},
4808 [],
4809 )
4810
4811 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
4812 mock_prepare_vdu_affinity_group_list.assert_called_once()
4813 mock_find_persistent_volumes.assert_not_called()
4814
4815 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4816 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4817 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4818 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4819 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4820 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4821 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4822 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4823 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4824 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4825 def test_process_vdu_params_wth_cloud_config(
4826 self,
4827 mock_prepare_vdu_affinity_group_list,
4828 mock_add_persistent_ordinary_disks_to_disk_list,
4829 mock_add_persistent_root_disk_to_disk_list,
4830 mock_find_persistent_volumes,
4831 mock_find_persistent_root_volumes,
4832 mock_prepare_vdu_ssh_keys,
4833 mock_prepare_vdu_cloud_init,
4834 mock_prepare_vdu_interfaces,
4835 mock_locate_vdu_interfaces,
4836 mock_sort_vdu_interfaces,
4837 ):
4838 """There is cloud-config."""
4839 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4840
4841 self.maxDiff = None
4842 target_vdu["interfaces"] = interfaces_wth_all_positions
4843 mock_prepare_vdu_cloud_init.return_value = {
4844 "user-data": user_data,
4845 "boot-data-drive": "vda",
4846 }
4847 mock_prepare_vdu_affinity_group_list.return_value = []
4848
4849 new_kwargs = deepcopy(kwargs)
4850 new_kwargs.update(
4851 {
4852 "vnfr_id": vnfr_id,
4853 "nsr_id": nsr_id,
4854 "tasks_by_target_record_id": {},
4855 "logger": "logger",
4856 }
4857 )
4858 expected_extra_dict3 = deepcopy(expected_extra_dict2)
4859 expected_extra_dict3["params"]["cloud_config"] = {
4860 "user-data": user_data,
4861 "boot-data-drive": "vda",
4862 }
4863 vnfd = deepcopy(vnfd_wth_persistent_storage)
4864 db.get_one.return_value = vnfd
4865 result = Ns._process_vdu_params(
4866 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
4867 )
4868 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
4869 mock_locate_vdu_interfaces.assert_not_called()
4870 mock_prepare_vdu_cloud_init.assert_called_once()
4871 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
4872 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
4873 mock_prepare_vdu_interfaces.assert_called_once_with(
4874 target_vdu,
4875 expected_extra_dict3,
4876 ns_preffix,
4877 vnf_preffix,
4878 "logger",
4879 {},
4880 [],
4881 )
4882 self.assertDictEqual(result, expected_extra_dict3)
4883 mock_prepare_vdu_ssh_keys.assert_called_once_with(
4884 target_vdu, None, {"user-data": user_data, "boot-data-drive": "vda"}
4885 )
4886 mock_prepare_vdu_affinity_group_list.assert_called_once()
4887 mock_find_persistent_volumes.assert_not_called()
4888
4889 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4890 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4891 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4892 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4893 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4894 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4895 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4896 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4897 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4898 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4899 def test_process_vdu_params_wthout_persistent_storage(
4900 self,
4901 mock_prepare_vdu_affinity_group_list,
4902 mock_add_persistent_ordinary_disks_to_disk_list,
4903 mock_add_persistent_root_disk_to_disk_list,
4904 mock_find_persistent_volumes,
4905 mock_find_persistent_root_volumes,
4906 mock_prepare_vdu_ssh_keys,
4907 mock_prepare_vdu_cloud_init,
4908 mock_prepare_vdu_interfaces,
4909 mock_locate_vdu_interfaces,
4910 mock_sort_vdu_interfaces,
4911 ):
4912 """There is not any persistent storage."""
4913 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4914
4915 self.maxDiff = None
4916 target_vdu["interfaces"] = interfaces_wth_all_positions
4917 mock_prepare_vdu_cloud_init.return_value = {}
4918 mock_prepare_vdu_affinity_group_list.return_value = []
4919
4920 new_kwargs = deepcopy(kwargs)
4921 new_kwargs.update(
4922 {
4923 "vnfr_id": vnfr_id,
4924 "nsr_id": nsr_id,
4925 "tasks_by_target_record_id": {},
4926 "logger": "logger",
4927 }
4928 )
4929 expected_extra_dict_copy = deepcopy(expected_extra_dict2)
4930 vnfd = deepcopy(vnfd_wthout_persistent_storage)
4931 db.get_one.return_value = vnfd
4932 result = Ns._process_vdu_params(
4933 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
4934 )
4935 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
4936 mock_locate_vdu_interfaces.assert_not_called()
4937 mock_prepare_vdu_cloud_init.assert_called_once()
4938 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
4939 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
4940 mock_prepare_vdu_interfaces.assert_called_once_with(
4941 target_vdu,
4942 expected_extra_dict_copy,
4943 ns_preffix,
4944 vnf_preffix,
4945 "logger",
4946 {},
4947 [],
4948 )
4949 self.assertDictEqual(result, expected_extra_dict_copy)
4950 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
4951 mock_prepare_vdu_affinity_group_list.assert_called_once()
4952 mock_find_persistent_volumes.assert_not_called()
4953
4954 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4955 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4956 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4957 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4958 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4959 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4960 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4961 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4962 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4963 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4964 def test_process_vdu_params_interfaces_partially_located(
4965 self,
4966 mock_prepare_vdu_affinity_group_list,
4967 mock_add_persistent_ordinary_disks_to_disk_list,
4968 mock_add_persistent_root_disk_to_disk_list,
4969 mock_find_persistent_volumes,
4970 mock_find_persistent_root_volumes,
4971 mock_prepare_vdu_ssh_keys,
4972 mock_prepare_vdu_cloud_init,
4973 mock_prepare_vdu_interfaces,
4974 mock_locate_vdu_interfaces,
4975 mock_sort_vdu_interfaces,
4976 ):
4977 """Some interfaces have position."""
4978 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4979
4980 self.maxDiff = None
4981 target_vdu["interfaces"] = [
4982 {
4983 "name": "vdu-eth1",
4984 "ns-vld-id": "net1",
4985 },
4986 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
4987 {
4988 "name": "vdu-eth3",
4989 "ns-vld-id": "mgmtnet",
4990 },
4991 ]
4992 mock_prepare_vdu_cloud_init.return_value = {}
4993 mock_prepare_vdu_affinity_group_list.return_value = []
4994 persistent_root_disk = {
4995 "persistent-root-volume": {
4996 "image_id": "ubuntu20.04",
4997 "size": "10",
4998 "keep": True,
4999 }
5000 }
5001 mock_find_persistent_root_volumes.return_value = persistent_root_disk
5002
5003 new_kwargs = deepcopy(kwargs)
5004 new_kwargs.update(
5005 {
5006 "vnfr_id": vnfr_id,
5007 "nsr_id": nsr_id,
5008 "tasks_by_target_record_id": {},
5009 "logger": "logger",
5010 }
5011 )
5012
5013 vnfd = deepcopy(vnfd_wth_persistent_storage)
5014 db.get_one.return_value = vnfd
5015 result = Ns._process_vdu_params(
5016 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5017 )
5018 expected_extra_dict_copy = deepcopy(expected_extra_dict)
5019 mock_sort_vdu_interfaces.assert_not_called()
5020 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5021 mock_prepare_vdu_cloud_init.assert_called_once()
5022 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5023 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
5024 mock_prepare_vdu_interfaces.assert_called_once_with(
5025 target_vdu,
5026 expected_extra_dict_copy,
5027 ns_preffix,
5028 vnf_preffix,
5029 "logger",
5030 {},
5031 [],
5032 )
5033 self.assertDictEqual(result, expected_extra_dict_copy)
5034 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5035 mock_prepare_vdu_affinity_group_list.assert_called_once()
5036 mock_find_persistent_volumes.assert_not_called()
5037 mock_find_persistent_root_volumes.assert_not_called()
5038
5039 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5040 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5041 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5042 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5043 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5044 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5045 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5046 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5047 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5048 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5049 def test_process_vdu_params_no_interface_position(
5050 self,
5051 mock_prepare_vdu_affinity_group_list,
5052 mock_add_persistent_ordinary_disks_to_disk_list,
5053 mock_add_persistent_root_disk_to_disk_list,
5054 mock_find_persistent_volumes,
5055 mock_find_persistent_root_volumes,
5056 mock_prepare_vdu_ssh_keys,
5057 mock_prepare_vdu_cloud_init,
5058 mock_prepare_vdu_interfaces,
5059 mock_locate_vdu_interfaces,
5060 mock_sort_vdu_interfaces,
5061 ):
5062 """Interfaces do not have position."""
5063 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5064
5065 self.maxDiff = None
5066 target_vdu["interfaces"] = interfaces_wthout_positions
5067 mock_prepare_vdu_cloud_init.return_value = {}
5068 mock_prepare_vdu_affinity_group_list.return_value = []
5069 persistent_root_disk = {
5070 "persistent-root-volume": {
5071 "image_id": "ubuntu20.04",
5072 "size": "10",
5073 "keep": True,
5074 }
5075 }
5076 mock_find_persistent_root_volumes.return_value = persistent_root_disk
5077 new_kwargs = deepcopy(kwargs)
5078 new_kwargs.update(
5079 {
5080 "vnfr_id": vnfr_id,
5081 "nsr_id": nsr_id,
5082 "tasks_by_target_record_id": {},
5083 "logger": "logger",
5084 }
5085 )
5086
5087 vnfd = deepcopy(vnfd_wth_persistent_storage)
5088 db.get_one.return_value = vnfd
5089 result = Ns._process_vdu_params(
5090 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5091 )
5092 expected_extra_dict_copy = deepcopy(expected_extra_dict)
5093 mock_sort_vdu_interfaces.assert_not_called()
5094 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5095 mock_prepare_vdu_cloud_init.assert_called_once()
5096 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5097 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
5098 mock_prepare_vdu_interfaces.assert_called_once_with(
5099 target_vdu,
5100 expected_extra_dict_copy,
5101 ns_preffix,
5102 vnf_preffix,
5103 "logger",
5104 {},
5105 [],
5106 )
5107 self.assertDictEqual(result, expected_extra_dict_copy)
5108 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5109 mock_prepare_vdu_affinity_group_list.assert_called_once()
5110 mock_find_persistent_volumes.assert_not_called()
5111 mock_find_persistent_root_volumes.assert_not_called()
5112
5113 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5114 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5115 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5116 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5117 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5118 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5119 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5120 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5121 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5122 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5123 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5124 self,
5125 mock_prepare_vdu_affinity_group_list,
5126 mock_add_persistent_ordinary_disks_to_disk_list,
5127 mock_add_persistent_root_disk_to_disk_list,
5128 mock_find_persistent_volumes,
5129 mock_find_persistent_root_volumes,
5130 mock_prepare_vdu_ssh_keys,
5131 mock_prepare_vdu_cloud_init,
5132 mock_prepare_vdu_interfaces,
5133 mock_locate_vdu_interfaces,
5134 mock_sort_vdu_interfaces,
5135 ):
5136 """Prepare vdu interfaces method raises exception."""
5137 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5138
5139 self.maxDiff = None
5140 target_vdu["interfaces"] = interfaces_wthout_positions
5141 mock_prepare_vdu_cloud_init.return_value = {}
5142 mock_prepare_vdu_affinity_group_list.return_value = []
5143 persistent_root_disk = {
5144 "persistent-root-volume": {
5145 "image_id": "ubuntu20.04",
5146 "size": "10",
5147 "keep": True,
5148 }
5149 }
5150 mock_find_persistent_root_volumes.return_value = persistent_root_disk
5151 new_kwargs = deepcopy(kwargs)
5152 new_kwargs.update(
5153 {
5154 "vnfr_id": vnfr_id,
5155 "nsr_id": nsr_id,
5156 "tasks_by_target_record_id": {},
5157 "logger": "logger",
5158 }
5159 )
5160 mock_prepare_vdu_interfaces.side_effect = TypeError
5161
5162 vnfd = deepcopy(vnfd_wth_persistent_storage)
5163 db.get_one.return_value = vnfd
5164 with self.assertRaises(Exception) as err:
5165 Ns._process_vdu_params(
5166 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5167 )
5168 self.assertEqual(type(err), TypeError)
5169 mock_sort_vdu_interfaces.assert_not_called()
5170 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5171 mock_prepare_vdu_cloud_init.assert_not_called()
5172 mock_add_persistent_root_disk_to_disk_list.assert_not_called()
5173 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
5174 mock_prepare_vdu_interfaces.assert_called_once()
5175 mock_prepare_vdu_ssh_keys.assert_not_called()
5176 mock_prepare_vdu_affinity_group_list.assert_not_called()
5177 mock_find_persistent_volumes.assert_not_called()
5178 mock_find_persistent_root_volumes.assert_not_called()
5179
5180 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5181 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5182 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5183 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5184 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5185 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5186 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5187 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5188 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5189 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5190 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5191 self,
5192 mock_prepare_vdu_affinity_group_list,
5193 mock_add_persistent_ordinary_disks_to_disk_list,
5194 mock_add_persistent_root_disk_to_disk_list,
5195 mock_find_persistent_volumes,
5196 mock_find_persistent_root_volumes,
5197 mock_prepare_vdu_ssh_keys,
5198 mock_prepare_vdu_cloud_init,
5199 mock_prepare_vdu_interfaces,
5200 mock_locate_vdu_interfaces,
5201 mock_sort_vdu_interfaces,
5202 ):
5203 """Add persistent root disk method raises exception."""
5204 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5205
5206 self.maxDiff = None
5207 target_vdu["interfaces"] = interfaces_wthout_positions
5208 mock_prepare_vdu_cloud_init.return_value = {}
5209 mock_prepare_vdu_affinity_group_list.return_value = []
5210 mock_add_persistent_root_disk_to_disk_list.side_effect = KeyError
5211 new_kwargs = deepcopy(kwargs)
5212 new_kwargs.update(
5213 {
5214 "vnfr_id": vnfr_id,
5215 "nsr_id": nsr_id,
5216 "tasks_by_target_record_id": {},
5217 "logger": "logger",
5218 }
5219 )
5220
5221 vnfd = deepcopy(vnfd_wth_persistent_storage)
5222 db.get_one.return_value = vnfd
5223 with self.assertRaises(Exception) as err:
5224 Ns._process_vdu_params(
5225 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5226 )
5227 self.assertEqual(type(err), KeyError)
5228 mock_sort_vdu_interfaces.assert_not_called()
5229 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5230 mock_prepare_vdu_cloud_init.assert_called_once()
5231 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5232 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
5233 mock_prepare_vdu_interfaces.assert_called_once_with(
5234 target_vdu,
5235 {
5236 "depends_on": [
5237 f"{ns_preffix}:image.0",
5238 f"{ns_preffix}:flavor.0",
5239 ]
5240 },
5241 ns_preffix,
5242 vnf_preffix,
5243 "logger",
5244 {},
5245 [],
5246 )
5247
5248 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5249 mock_prepare_vdu_affinity_group_list.assert_not_called()
5250 mock_find_persistent_volumes.assert_not_called()
5251 mock_find_persistent_root_volumes.assert_not_called()
5252
5253 def test_select_persistent_root_disk(self):
5254 vdu = deepcopy(target_vdu_wth_persistent_storage)
5255 vdu["virtual-storage-desc"] = [
5256 "persistent-root-volume",
5257 "persistent-volume2",
5258 "ephemeral-volume",
5259 ]
5260 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5261 expected_result = vsd
5262 result = Ns._select_persistent_root_disk(vsd, vdu)
5263 self.assertEqual(result, expected_result)
5264
5265 def test_select_persistent_root_disk_first_vsd_is_different(self):
5266 """VDU first virtual-storage-desc is different than vsd id."""
5267 vdu = deepcopy(target_vdu_wth_persistent_storage)
5268 vdu["virtual-storage-desc"] = [
5269 "persistent-volume2",
5270 "persistent-root-volume",
5271 "ephemeral-volume",
5272 ]
5273 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5274 expected_result = None
5275 result = Ns._select_persistent_root_disk(vsd, vdu)
5276 self.assertEqual(result, expected_result)
5277
5278 def test_select_persistent_root_disk_vsd_is_not_persistent(self):
5279 """vsd type is not persistent."""
5280 vdu = deepcopy(target_vdu_wth_persistent_storage)
5281 vdu["virtual-storage-desc"] = [
5282 "persistent-volume2",
5283 "persistent-root-volume",
5284 "ephemeral-volume",
5285 ]
5286 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5287 vsd["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5288 expected_result = None
5289 result = Ns._select_persistent_root_disk(vsd, vdu)
5290 self.assertEqual(result, expected_result)
5291
5292 def test_select_persistent_root_disk_vsd_does_not_have_size(self):
5293 """vsd size is None."""
5294 vdu = deepcopy(target_vdu_wth_persistent_storage)
5295 vdu["virtual-storage-desc"] = [
5296 "persistent-volume2",
5297 "persistent-root-volume",
5298 "ephemeral-volume",
5299 ]
5300 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5301 vsd["size-of-storage"] = None
5302 expected_result = None
5303 result = Ns._select_persistent_root_disk(vsd, vdu)
5304 self.assertEqual(result, expected_result)
5305
5306 def test_select_persistent_root_disk_vdu_wthout_vsd(self):
5307 """VDU does not have virtual-storage-desc."""
5308 vdu = deepcopy(target_vdu_wth_persistent_storage)
5309 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5310 expected_result = None
5311 result = Ns._select_persistent_root_disk(vsd, vdu)
5312 self.assertEqual(result, expected_result)
5313
5314 def test_select_persistent_root_disk_invalid_vsd_type(self):
5315 """vsd is list, expected to be a dict."""
5316 vdu = deepcopy(target_vdu_wth_persistent_storage)
5317 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"]
5318 with self.assertRaises(AttributeError):
5319 Ns._select_persistent_root_disk(vsd, vdu)