a2368195f5817e3d4a86288d3c8d506d69d76de6
[osm/RO.git] / NG-RO / osm_ng_ro / tests / test_ns.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy import deepcopy
18 import unittest
19 from unittest.mock import MagicMock, Mock, patch
20
21 from jinja2 import (
22 Environment,
23 select_autoescape,
24 StrictUndefined,
25 TemplateError,
26 TemplateNotFound,
27 UndefinedError,
28 )
29 from osm_ng_ro.ns import Ns, NsException
30
31
32 __author__ = "Eduardo Sousa"
33 __date__ = "$19-NOV-2021 00:00:00$"
34
35
36 # Variables used in Tests
37 vnfd_wth_persistent_storage = {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
39 "df": [
40 {
41 "id": "default-df",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
43 }
44 ],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
47 "vdu": [
48 {
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
53 "ubuntu20.04-aws",
54 "ubuntu20.04-azure",
55 ],
56 "virtual-storage-desc": [
57 "persistent-root-volume",
58 "persistent-volume2",
59 "ephemeral-volume",
60 ],
61 }
62 ],
63 "version": "1.0",
64 "virtual-storage-desc": [
65 {
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
69 },
70 {
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
74 "vdu-storage-requirements": [
75 {"key": "keep-volume", "value": "true"},
76 ],
77 },
78 {
79 "id": "ephemeral-volume",
80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
81 "size-of-storage": "1",
82 },
83 ],
84 "_admin": {
85 "storage": {
86 "fs": "mongo",
87 "path": "/app/storage/",
88 },
89 "type": "vnfd",
90 },
91 }
92 vim_volume_id = "ru937f49-3870-4169-b758-9732e1ff40f3"
93 task_by_target_record_id = {
94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
95 "extra_dict": {"params": {"net_type": "SR-IOV"}}
96 }
97 }
98 interfaces_wthout_positions = [
99 {
100 "name": "vdu-eth1",
101 "ns-vld-id": "net1",
102 },
103 {
104 "name": "vdu-eth2",
105 "ns-vld-id": "net2",
106 },
107 {
108 "name": "vdu-eth3",
109 "ns-vld-id": "mgmtnet",
110 },
111 ]
112 interfaces_wth_all_positions = [
113 {
114 "name": "vdu-eth1",
115 "ns-vld-id": "net1",
116 "position": 2,
117 },
118 {
119 "name": "vdu-eth2",
120 "ns-vld-id": "net2",
121 "position": 0,
122 },
123 {
124 "name": "vdu-eth3",
125 "ns-vld-id": "mgmtnet",
126 "position": 1,
127 },
128 ]
129 target_vdu_wth_persistent_storage = {
130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
131 "ns-flavor-id": "0",
132 "ns-image-id": "0",
133 "vdu-name": "several_volumes-VM",
134 "interfaces": [
135 {
136 "name": "vdu-eth0",
137 "ns-vld-id": "mgmtnet",
138 }
139 ],
140 "virtual-storages": [
141 {
142 "id": "persistent-volume2",
143 "size-of-storage": "10",
144 "type-of-storage": "persistent-storage:persistent-storage",
145 },
146 {
147 "id": "persistent-root-volume",
148 "size-of-storage": "10",
149 "type-of-storage": "persistent-storage:persistent-storage",
150 "vdu-storage-requirements": [
151 {"key": "keep-volume", "value": "true"},
152 ],
153 },
154 {
155 "id": "ephemeral-volume",
156 "size-of-storage": "1",
157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
158 },
159 ],
160 }
161 db = MagicMock(name="database mock")
162 fs = MagicMock(name="database mock")
163 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
164 vnf_preffix = "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
165 vnfr_id = "wh47f48-y870-4169-b758-5732e1ff40f5"
166 nsr_id = "th47f48-9870-4169-b758-9732e1ff40f3"
167 indata = {
168 "name": "sample_name",
169 }
170 expected_extra_dict = {
171 "depends_on": [
172 f"{ns_preffix}:image.0",
173 f"{ns_preffix}:flavor.0",
174 ],
175 "params": {
176 "affinity_group_list": [],
177 "availability_zone_index": None,
178 "availability_zone_list": None,
179 "cloud_config": None,
180 "description": "several_volumes-VM",
181 "disk_list": [],
182 "flavor_id": f"TASK-{ns_preffix}:flavor.0",
183 "image_id": f"TASK-{ns_preffix}:image.0",
184 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
185 "net_list": [],
186 "start": True,
187 },
188 }
189
190 expected_extra_dict2 = {
191 "depends_on": [
192 f"{ns_preffix}:image.0",
193 f"{ns_preffix}:flavor.0",
194 ],
195 "params": {
196 "affinity_group_list": [],
197 "availability_zone_index": None,
198 "availability_zone_list": None,
199 "cloud_config": None,
200 "description": "without_volumes-VM",
201 "disk_list": [],
202 "flavor_id": f"TASK-{ns_preffix}:flavor.0",
203 "image_id": f"TASK-{ns_preffix}:image.0",
204 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
205 "net_list": [],
206 "start": True,
207 },
208 }
209
210 expected_extra_dict3 = {
211 "depends_on": [
212 f"{ns_preffix}:image.0",
213 ],
214 "params": {
215 "affinity_group_list": [],
216 "availability_zone_index": None,
217 "availability_zone_list": None,
218 "cloud_config": None,
219 "description": "without_volumes-VM",
220 "disk_list": [],
221 "flavor_id": "flavor_test",
222 "image_id": f"TASK-{ns_preffix}:image.0",
223 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
224 "net_list": [],
225 "start": True,
226 },
227 }
228 tasks_by_target_record_id = {
229 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
230 "extra_dict": {
231 "params": {
232 "net_type": "SR-IOV",
233 }
234 }
235 }
236 }
237 kwargs = {
238 "db": MagicMock(),
239 "vdu2cloud_init": {},
240 "vnfr": {
241 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
242 "member-vnf-index-ref": "vnf-several-volumes",
243 },
244 }
245 vnfd_wthout_persistent_storage = {
246 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
247 "df": [
248 {
249 "id": "default-df",
250 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
251 }
252 ],
253 "id": "without_volumes-vnf",
254 "product-name": "without_volumes-vnf",
255 "vdu": [
256 {
257 "id": "without_volumes-VM",
258 "name": "without_volumes-VM",
259 "sw-image-desc": "ubuntu20.04",
260 "alternative-sw-image-desc": [
261 "ubuntu20.04-aws",
262 "ubuntu20.04-azure",
263 ],
264 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
265 }
266 ],
267 "version": "1.0",
268 "virtual-storage-desc": [
269 {"id": "root-volume", "size-of-storage": "10"},
270 {
271 "id": "ephemeral-volume",
272 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
273 "size-of-storage": "1",
274 },
275 ],
276 "_admin": {
277 "storage": {
278 "fs": "mongo",
279 "path": "/app/storage/",
280 },
281 "type": "vnfd",
282 },
283 }
284
285 target_vdu_wthout_persistent_storage = {
286 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
287 "ns-flavor-id": "0",
288 "ns-image-id": "0",
289 "vdu-name": "without_volumes-VM",
290 "interfaces": [
291 {
292 "name": "vdu-eth0",
293 "ns-vld-id": "mgmtnet",
294 }
295 ],
296 "virtual-storages": [
297 {
298 "id": "root-volume",
299 "size-of-storage": "10",
300 },
301 {
302 "id": "ephemeral-volume",
303 "size-of-storage": "1",
304 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
305 },
306 ],
307 }
308 cloud_init_content = """
309 disk_setup:
310 ephemeral0:
311 table_type: {{type}}
312 layout: True
313 overwrite: {{is_override}}
314 runcmd:
315 - [ ls, -l, / ]
316 - [ sh, -xc, "echo $(date) '{{command}}'" ]
317 """
318
319 user_data = """
320 disk_setup:
321 ephemeral0:
322 table_type: mbr
323 layout: True
324 overwrite: False
325 runcmd:
326 - [ ls, -l, / ]
327 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
328 """
329
330
331 class CopyingMock(MagicMock):
332 def __call__(self, *args, **kwargs):
333 args = deepcopy(args)
334 kwargs = deepcopy(kwargs)
335 return super(CopyingMock, self).__call__(*args, **kwargs)
336
337
338 class TestNs(unittest.TestCase):
339 def setUp(self):
340 pass
341
342 def test__create_task_without_extra_dict(self):
343 expected_result = {
344 "target_id": "vim_openstack_1",
345 "action_id": "123456",
346 "nsr_id": "654321",
347 "task_id": "123456:1",
348 "status": "SCHEDULED",
349 "action": "CREATE",
350 "item": "test_item",
351 "target_record": "test_target_record",
352 "target_record_id": "test_target_record_id",
353 }
354 deployment_info = {
355 "action_id": "123456",
356 "nsr_id": "654321",
357 "task_index": 1,
358 }
359
360 task = Ns._create_task(
361 deployment_info=deployment_info,
362 target_id="vim_openstack_1",
363 item="test_item",
364 action="CREATE",
365 target_record="test_target_record",
366 target_record_id="test_target_record_id",
367 )
368
369 self.assertEqual(deployment_info.get("task_index"), 2)
370 self.assertDictEqual(task, expected_result)
371
372 def test__create_task(self):
373 expected_result = {
374 "target_id": "vim_openstack_1",
375 "action_id": "123456",
376 "nsr_id": "654321",
377 "task_id": "123456:1",
378 "status": "SCHEDULED",
379 "action": "CREATE",
380 "item": "test_item",
381 "target_record": "test_target_record",
382 "target_record_id": "test_target_record_id",
383 # values coming from extra_dict
384 "params": "test_params",
385 "find_params": "test_find_params",
386 "depends_on": "test_depends_on",
387 }
388 deployment_info = {
389 "action_id": "123456",
390 "nsr_id": "654321",
391 "task_index": 1,
392 }
393
394 task = Ns._create_task(
395 deployment_info=deployment_info,
396 target_id="vim_openstack_1",
397 item="test_item",
398 action="CREATE",
399 target_record="test_target_record",
400 target_record_id="test_target_record_id",
401 extra_dict={
402 "params": "test_params",
403 "find_params": "test_find_params",
404 "depends_on": "test_depends_on",
405 },
406 )
407
408 self.assertEqual(deployment_info.get("task_index"), 2)
409 self.assertDictEqual(task, expected_result)
410
411 @patch("osm_ng_ro.ns.time")
412 def test__create_ro_task(self, mock_time: Mock):
413 now = 1637324838.994551
414 mock_time.return_value = now
415 task = {
416 "target_id": "vim_openstack_1",
417 "action_id": "123456",
418 "nsr_id": "654321",
419 "task_id": "123456:1",
420 "status": "SCHEDULED",
421 "action": "CREATE",
422 "item": "test_item",
423 "target_record": "test_target_record",
424 "target_record_id": "test_target_record_id",
425 # values coming from extra_dict
426 "params": "test_params",
427 "find_params": "test_find_params",
428 "depends_on": "test_depends_on",
429 }
430 expected_result = {
431 "_id": "123456:1",
432 "locked_by": None,
433 "locked_at": 0.0,
434 "target_id": "vim_openstack_1",
435 "vim_info": {
436 "created": False,
437 "created_items": None,
438 "vim_id": None,
439 "vim_name": None,
440 "vim_status": None,
441 "vim_details": None,
442 "vim_message": None,
443 "refresh_at": None,
444 },
445 "modified_at": now,
446 "created_at": now,
447 "to_check_at": now,
448 "tasks": [task],
449 }
450
451 ro_task = Ns._create_ro_task(
452 target_id="vim_openstack_1",
453 task=task,
454 )
455
456 self.assertDictEqual(ro_task, expected_result)
457
458 def test__process_image_params_with_empty_target_image(self):
459 expected_result = {
460 "find_params": {},
461 }
462 target_image = {}
463
464 result = Ns._process_image_params(
465 target_image=target_image,
466 indata=None,
467 vim_info=None,
468 target_record_id=None,
469 )
470
471 self.assertDictEqual(expected_result, result)
472
473 def test__process_image_params_with_wrong_target_image(self):
474 expected_result = {
475 "find_params": {},
476 }
477 target_image = {
478 "no_image": "to_see_here",
479 }
480
481 result = Ns._process_image_params(
482 target_image=target_image,
483 indata=None,
484 vim_info=None,
485 target_record_id=None,
486 )
487
488 self.assertDictEqual(expected_result, result)
489
490 def test__process_image_params_with_image(self):
491 expected_result = {
492 "find_params": {
493 "filter_dict": {
494 "name": "cirros",
495 },
496 },
497 }
498 target_image = {
499 "image": "cirros",
500 }
501
502 result = Ns._process_image_params(
503 target_image=target_image,
504 indata=None,
505 vim_info=None,
506 target_record_id=None,
507 )
508
509 self.assertDictEqual(expected_result, result)
510
511 def test__process_image_params_with_vim_image_id(self):
512 expected_result = {
513 "find_params": {
514 "filter_dict": {
515 "id": "123456",
516 },
517 },
518 }
519 target_image = {
520 "vim_image_id": "123456",
521 }
522
523 result = Ns._process_image_params(
524 target_image=target_image,
525 indata=None,
526 vim_info=None,
527 target_record_id=None,
528 )
529
530 self.assertDictEqual(expected_result, result)
531
532 def test__process_image_params_with_image_checksum(self):
533 expected_result = {
534 "find_params": {
535 "filter_dict": {
536 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
537 },
538 },
539 }
540 target_image = {
541 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
542 }
543
544 result = Ns._process_image_params(
545 target_image=target_image,
546 indata=None,
547 vim_info=None,
548 target_record_id=None,
549 )
550
551 self.assertDictEqual(expected_result, result)
552
553 def test__get_resource_allocation_params_with_empty_target_image(self):
554 expected_result = {}
555 quota_descriptor = {}
556
557 result = Ns._get_resource_allocation_params(
558 quota_descriptor=quota_descriptor,
559 )
560
561 self.assertDictEqual(expected_result, result)
562
563 def test__get_resource_allocation_params_with_wrong_target_image(self):
564 expected_result = {}
565 quota_descriptor = {
566 "no_quota": "present_here",
567 }
568
569 result = Ns._get_resource_allocation_params(
570 quota_descriptor=quota_descriptor,
571 )
572
573 self.assertDictEqual(expected_result, result)
574
575 def test__get_resource_allocation_params_with_limit(self):
576 expected_result = {
577 "limit": 10,
578 }
579 quota_descriptor = {
580 "limit": "10",
581 }
582
583 result = Ns._get_resource_allocation_params(
584 quota_descriptor=quota_descriptor,
585 )
586
587 self.assertDictEqual(expected_result, result)
588
589 def test__get_resource_allocation_params_with_reserve(self):
590 expected_result = {
591 "reserve": 20,
592 }
593 quota_descriptor = {
594 "reserve": "20",
595 }
596
597 result = Ns._get_resource_allocation_params(
598 quota_descriptor=quota_descriptor,
599 )
600
601 self.assertDictEqual(expected_result, result)
602
603 def test__get_resource_allocation_params_with_shares(self):
604 expected_result = {
605 "shares": 30,
606 }
607 quota_descriptor = {
608 "shares": "30",
609 }
610
611 result = Ns._get_resource_allocation_params(
612 quota_descriptor=quota_descriptor,
613 )
614
615 self.assertDictEqual(expected_result, result)
616
617 def test__get_resource_allocation_params(self):
618 expected_result = {
619 "limit": 10,
620 "reserve": 20,
621 "shares": 30,
622 }
623 quota_descriptor = {
624 "limit": "10",
625 "reserve": "20",
626 "shares": "30",
627 }
628
629 result = Ns._get_resource_allocation_params(
630 quota_descriptor=quota_descriptor,
631 )
632
633 self.assertDictEqual(expected_result, result)
634
635 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
636 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
637 self,
638 resource_allocation,
639 ):
640 expected_result = {}
641 guest_epa_quota = {}
642 epa_vcpu_set = True
643
644 result = Ns._process_guest_epa_quota_params(
645 guest_epa_quota=guest_epa_quota,
646 epa_vcpu_set=epa_vcpu_set,
647 )
648
649 self.assertDictEqual(expected_result, result)
650 self.assertFalse(resource_allocation.called)
651
652 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
653 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
654 self,
655 resource_allocation,
656 ):
657 expected_result = {}
658 guest_epa_quota = {}
659 epa_vcpu_set = False
660
661 result = Ns._process_guest_epa_quota_params(
662 guest_epa_quota=guest_epa_quota,
663 epa_vcpu_set=epa_vcpu_set,
664 )
665
666 self.assertDictEqual(expected_result, result)
667 self.assertFalse(resource_allocation.called)
668
669 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
670 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
671 self,
672 resource_allocation,
673 ):
674 expected_result = {}
675 guest_epa_quota = {
676 "no-quota": "nothing",
677 }
678 epa_vcpu_set = True
679
680 result = Ns._process_guest_epa_quota_params(
681 guest_epa_quota=guest_epa_quota,
682 epa_vcpu_set=epa_vcpu_set,
683 )
684
685 self.assertDictEqual(expected_result, result)
686 self.assertFalse(resource_allocation.called)
687
688 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
689 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
690 self,
691 resource_allocation,
692 ):
693 expected_result = {}
694 guest_epa_quota = {
695 "no-quota": "nothing",
696 }
697 epa_vcpu_set = False
698
699 result = Ns._process_guest_epa_quota_params(
700 guest_epa_quota=guest_epa_quota,
701 epa_vcpu_set=epa_vcpu_set,
702 )
703
704 self.assertDictEqual(expected_result, result)
705 self.assertFalse(resource_allocation.called)
706
707 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
708 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
709 self,
710 resource_allocation,
711 ):
712 expected_result = {}
713 guest_epa_quota = {
714 "cpu-quota": {
715 "limit": "10",
716 "reserve": "20",
717 "shares": "30",
718 },
719 }
720 epa_vcpu_set = True
721
722 result = Ns._process_guest_epa_quota_params(
723 guest_epa_quota=guest_epa_quota,
724 epa_vcpu_set=epa_vcpu_set,
725 )
726
727 self.assertDictEqual(expected_result, result)
728 self.assertFalse(resource_allocation.called)
729
730 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
731 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
732 self,
733 resource_allocation,
734 ):
735 expected_result = {
736 "cpu-quota": {
737 "limit": 10,
738 "reserve": 20,
739 "shares": 30,
740 },
741 }
742 guest_epa_quota = {
743 "cpu-quota": {
744 "limit": "10",
745 "reserve": "20",
746 "shares": "30",
747 },
748 }
749 epa_vcpu_set = False
750
751 resource_allocation_param = {
752 "limit": "10",
753 "reserve": "20",
754 "shares": "30",
755 }
756 resource_allocation.return_value = {
757 "limit": 10,
758 "reserve": 20,
759 "shares": 30,
760 }
761
762 result = Ns._process_guest_epa_quota_params(
763 guest_epa_quota=guest_epa_quota,
764 epa_vcpu_set=epa_vcpu_set,
765 )
766
767 resource_allocation.assert_called_once_with(resource_allocation_param)
768 self.assertDictEqual(expected_result, result)
769
770 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
771 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
772 self,
773 resource_allocation,
774 ):
775 expected_result = {
776 "mem-quota": {
777 "limit": 10,
778 "reserve": 20,
779 "shares": 30,
780 },
781 }
782 guest_epa_quota = {
783 "mem-quota": {
784 "limit": "10",
785 "reserve": "20",
786 "shares": "30",
787 },
788 }
789 epa_vcpu_set = True
790
791 resource_allocation_param = {
792 "limit": "10",
793 "reserve": "20",
794 "shares": "30",
795 }
796 resource_allocation.return_value = {
797 "limit": 10,
798 "reserve": 20,
799 "shares": 30,
800 }
801
802 result = Ns._process_guest_epa_quota_params(
803 guest_epa_quota=guest_epa_quota,
804 epa_vcpu_set=epa_vcpu_set,
805 )
806
807 resource_allocation.assert_called_once_with(resource_allocation_param)
808 self.assertDictEqual(expected_result, result)
809
810 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
811 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
812 self,
813 resource_allocation,
814 ):
815 expected_result = {
816 "mem-quota": {
817 "limit": 10,
818 "reserve": 20,
819 "shares": 30,
820 },
821 }
822 guest_epa_quota = {
823 "mem-quota": {
824 "limit": "10",
825 "reserve": "20",
826 "shares": "30",
827 },
828 }
829 epa_vcpu_set = False
830
831 resource_allocation_param = {
832 "limit": "10",
833 "reserve": "20",
834 "shares": "30",
835 }
836 resource_allocation.return_value = {
837 "limit": 10,
838 "reserve": 20,
839 "shares": 30,
840 }
841
842 result = Ns._process_guest_epa_quota_params(
843 guest_epa_quota=guest_epa_quota,
844 epa_vcpu_set=epa_vcpu_set,
845 )
846
847 resource_allocation.assert_called_once_with(resource_allocation_param)
848 self.assertDictEqual(expected_result, result)
849
850 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
851 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
852 self,
853 resource_allocation,
854 ):
855 expected_result = {
856 "disk-io-quota": {
857 "limit": 10,
858 "reserve": 20,
859 "shares": 30,
860 },
861 }
862 guest_epa_quota = {
863 "disk-io-quota": {
864 "limit": "10",
865 "reserve": "20",
866 "shares": "30",
867 },
868 }
869 epa_vcpu_set = True
870
871 resource_allocation_param = {
872 "limit": "10",
873 "reserve": "20",
874 "shares": "30",
875 }
876 resource_allocation.return_value = {
877 "limit": 10,
878 "reserve": 20,
879 "shares": 30,
880 }
881
882 result = Ns._process_guest_epa_quota_params(
883 guest_epa_quota=guest_epa_quota,
884 epa_vcpu_set=epa_vcpu_set,
885 )
886
887 resource_allocation.assert_called_once_with(resource_allocation_param)
888 self.assertDictEqual(expected_result, result)
889
890 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
891 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
892 self,
893 resource_allocation,
894 ):
895 expected_result = {
896 "disk-io-quota": {
897 "limit": 10,
898 "reserve": 20,
899 "shares": 30,
900 },
901 }
902 guest_epa_quota = {
903 "disk-io-quota": {
904 "limit": "10",
905 "reserve": "20",
906 "shares": "30",
907 },
908 }
909 epa_vcpu_set = False
910
911 resource_allocation_param = {
912 "limit": "10",
913 "reserve": "20",
914 "shares": "30",
915 }
916 resource_allocation.return_value = {
917 "limit": 10,
918 "reserve": 20,
919 "shares": 30,
920 }
921
922 result = Ns._process_guest_epa_quota_params(
923 guest_epa_quota=guest_epa_quota,
924 epa_vcpu_set=epa_vcpu_set,
925 )
926
927 resource_allocation.assert_called_once_with(resource_allocation_param)
928 self.assertDictEqual(expected_result, result)
929
930 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
931 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
932 self,
933 resource_allocation,
934 ):
935 expected_result = {
936 "vif-quota": {
937 "limit": 10,
938 "reserve": 20,
939 "shares": 30,
940 },
941 }
942 guest_epa_quota = {
943 "vif-quota": {
944 "limit": "10",
945 "reserve": "20",
946 "shares": "30",
947 },
948 }
949 epa_vcpu_set = True
950
951 resource_allocation_param = {
952 "limit": "10",
953 "reserve": "20",
954 "shares": "30",
955 }
956 resource_allocation.return_value = {
957 "limit": 10,
958 "reserve": 20,
959 "shares": 30,
960 }
961
962 result = Ns._process_guest_epa_quota_params(
963 guest_epa_quota=guest_epa_quota,
964 epa_vcpu_set=epa_vcpu_set,
965 )
966
967 resource_allocation.assert_called_once_with(resource_allocation_param)
968 self.assertDictEqual(expected_result, result)
969
970 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
971 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
972 self,
973 resource_allocation,
974 ):
975 expected_result = {
976 "vif-quota": {
977 "limit": 10,
978 "reserve": 20,
979 "shares": 30,
980 },
981 }
982 guest_epa_quota = {
983 "vif-quota": {
984 "limit": "10",
985 "reserve": "20",
986 "shares": "30",
987 },
988 }
989 epa_vcpu_set = False
990
991 resource_allocation_param = {
992 "limit": "10",
993 "reserve": "20",
994 "shares": "30",
995 }
996 resource_allocation.return_value = {
997 "limit": 10,
998 "reserve": 20,
999 "shares": 30,
1000 }
1001
1002 result = Ns._process_guest_epa_quota_params(
1003 guest_epa_quota=guest_epa_quota,
1004 epa_vcpu_set=epa_vcpu_set,
1005 )
1006
1007 resource_allocation.assert_called_once_with(resource_allocation_param)
1008 self.assertDictEqual(expected_result, result)
1009
1010 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1011 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
1012 self,
1013 resource_allocation,
1014 ):
1015 expected_result = {
1016 "mem-quota": {
1017 "limit": 10,
1018 "reserve": 20,
1019 "shares": 30,
1020 },
1021 "disk-io-quota": {
1022 "limit": 10,
1023 "reserve": 20,
1024 "shares": 30,
1025 },
1026 "vif-quota": {
1027 "limit": 10,
1028 "reserve": 20,
1029 "shares": 30,
1030 },
1031 }
1032 guest_epa_quota = {
1033 "cpu-quota": {
1034 "limit": "10",
1035 "reserve": "20",
1036 "shares": "30",
1037 },
1038 "mem-quota": {
1039 "limit": "10",
1040 "reserve": "20",
1041 "shares": "30",
1042 },
1043 "disk-io-quota": {
1044 "limit": "10",
1045 "reserve": "20",
1046 "shares": "30",
1047 },
1048 "vif-quota": {
1049 "limit": "10",
1050 "reserve": "20",
1051 "shares": "30",
1052 },
1053 }
1054 epa_vcpu_set = True
1055
1056 resource_allocation.return_value = {
1057 "limit": 10,
1058 "reserve": 20,
1059 "shares": 30,
1060 }
1061
1062 result = Ns._process_guest_epa_quota_params(
1063 guest_epa_quota=guest_epa_quota,
1064 epa_vcpu_set=epa_vcpu_set,
1065 )
1066
1067 self.assertTrue(resource_allocation.called)
1068 self.assertDictEqual(expected_result, result)
1069
1070 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1071 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1072 self,
1073 resource_allocation,
1074 ):
1075 expected_result = {
1076 "cpu-quota": {
1077 "limit": 10,
1078 "reserve": 20,
1079 "shares": 30,
1080 },
1081 "mem-quota": {
1082 "limit": 10,
1083 "reserve": 20,
1084 "shares": 30,
1085 },
1086 "disk-io-quota": {
1087 "limit": 10,
1088 "reserve": 20,
1089 "shares": 30,
1090 },
1091 "vif-quota": {
1092 "limit": 10,
1093 "reserve": 20,
1094 "shares": 30,
1095 },
1096 }
1097 guest_epa_quota = {
1098 "cpu-quota": {
1099 "limit": "10",
1100 "reserve": "20",
1101 "shares": "30",
1102 },
1103 "mem-quota": {
1104 "limit": "10",
1105 "reserve": "20",
1106 "shares": "30",
1107 },
1108 "disk-io-quota": {
1109 "limit": "10",
1110 "reserve": "20",
1111 "shares": "30",
1112 },
1113 "vif-quota": {
1114 "limit": "10",
1115 "reserve": "20",
1116 "shares": "30",
1117 },
1118 }
1119 epa_vcpu_set = False
1120
1121 resource_allocation.return_value = {
1122 "limit": 10,
1123 "reserve": 20,
1124 "shares": 30,
1125 }
1126
1127 result = Ns._process_guest_epa_quota_params(
1128 guest_epa_quota=guest_epa_quota,
1129 epa_vcpu_set=epa_vcpu_set,
1130 )
1131
1132 self.assertTrue(resource_allocation.called)
1133 self.assertDictEqual(expected_result, result)
1134
1135 def test__process_guest_epa_numa_params_with_empty_numa_params(self):
1136 expected_numa_result = []
1137 expected_epa_vcpu_set_result = False
1138 guest_epa_quota = {}
1139
1140 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1141 guest_epa_quota=guest_epa_quota,
1142 )
1143 self.assertEqual(expected_numa_result, numa_result)
1144 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1145
1146 def test__process_guest_epa_numa_params_with_wrong_numa_params(self):
1147 expected_numa_result = []
1148 expected_epa_vcpu_set_result = False
1149 guest_epa_quota = {"no_nume": "here"}
1150
1151 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1152 guest_epa_quota=guest_epa_quota,
1153 )
1154
1155 self.assertEqual(expected_numa_result, numa_result)
1156 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1157
1158 def test__process_guest_epa_numa_params_with_numa_node_policy(self):
1159 expected_numa_result = []
1160 expected_epa_vcpu_set_result = False
1161 guest_epa_quota = {"numa-node-policy": {}}
1162
1163 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1164 guest_epa_quota=guest_epa_quota,
1165 )
1166
1167 self.assertEqual(expected_numa_result, numa_result)
1168 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1169
1170 def test__process_guest_epa_numa_params_with_no_node(self):
1171 expected_numa_result = []
1172 expected_epa_vcpu_set_result = False
1173 guest_epa_quota = {
1174 "numa-node-policy": {
1175 "node": [],
1176 },
1177 }
1178
1179 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1180 guest_epa_quota=guest_epa_quota,
1181 )
1182
1183 self.assertEqual(expected_numa_result, numa_result)
1184 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1185
1186 def test__process_guest_epa_numa_params_with_1_node_num_cores(self):
1187 expected_numa_result = [{"cores": 3}]
1188 expected_epa_vcpu_set_result = True
1189 guest_epa_quota = {
1190 "numa-node-policy": {
1191 "node": [
1192 {
1193 "num-cores": 3,
1194 },
1195 ],
1196 },
1197 }
1198
1199 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1200 guest_epa_quota=guest_epa_quota,
1201 )
1202
1203 self.assertEqual(expected_numa_result, numa_result)
1204 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1205
1206 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self):
1207 expected_numa_result = [{"paired_threads": 3}]
1208 expected_epa_vcpu_set_result = True
1209 guest_epa_quota = {
1210 "numa-node-policy": {
1211 "node": [
1212 {
1213 "paired-threads": {"num-paired-threads": "3"},
1214 },
1215 ],
1216 },
1217 }
1218
1219 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1220 guest_epa_quota=guest_epa_quota,
1221 )
1222
1223 self.assertEqual(expected_numa_result, numa_result)
1224 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1225
1226 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self):
1227 expected_numa_result = [
1228 {
1229 "paired-threads-id": [("0", "1"), ("4", "5")],
1230 }
1231 ]
1232 expected_epa_vcpu_set_result = False
1233 guest_epa_quota = {
1234 "numa-node-policy": {
1235 "node": [
1236 {
1237 "paired-threads": {
1238 "paired-thread-ids": [
1239 {
1240 "thread-a": 0,
1241 "thread-b": 1,
1242 },
1243 {
1244 "thread-a": 4,
1245 "thread-b": 5,
1246 },
1247 ],
1248 },
1249 },
1250 ],
1251 },
1252 }
1253
1254 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1255 guest_epa_quota=guest_epa_quota,
1256 )
1257
1258 self.assertEqual(expected_numa_result, numa_result)
1259 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1260
1261 def test__process_guest_epa_numa_params_with_1_node_num_threads(self):
1262 expected_numa_result = [{"threads": 3}]
1263 expected_epa_vcpu_set_result = True
1264 guest_epa_quota = {
1265 "numa-node-policy": {
1266 "node": [
1267 {
1268 "num-threads": "3",
1269 },
1270 ],
1271 },
1272 }
1273
1274 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1275 guest_epa_quota=guest_epa_quota,
1276 )
1277
1278 self.assertEqual(expected_numa_result, numa_result)
1279 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1280
1281 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self):
1282 expected_numa_result = [{"memory": 2}]
1283 expected_epa_vcpu_set_result = False
1284 guest_epa_quota = {
1285 "numa-node-policy": {
1286 "node": [
1287 {
1288 "memory-mb": 2048,
1289 },
1290 ],
1291 },
1292 }
1293
1294 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1295 guest_epa_quota=guest_epa_quota,
1296 )
1297
1298 self.assertEqual(expected_numa_result, numa_result)
1299 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1300
1301 def test__process_guest_epa_numa_params_with_1_node_vcpu(self):
1302 expected_numa_result = [
1303 {
1304 "id": 0,
1305 "vcpu": [0, 1],
1306 }
1307 ]
1308 expected_epa_vcpu_set_result = False
1309 guest_epa_quota = {
1310 "numa-node-policy": {
1311 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1312 },
1313 }
1314
1315 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1316 guest_epa_quota=guest_epa_quota,
1317 )
1318
1319 self.assertEqual(expected_numa_result, numa_result)
1320 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1321
1322 def test__process_guest_epa_numa_params_with_2_node_vcpu(self):
1323 expected_numa_result = [
1324 {
1325 "id": 0,
1326 "vcpu": [0, 1],
1327 },
1328 {
1329 "id": 1,
1330 "vcpu": [2, 3],
1331 },
1332 ]
1333
1334 expected_epa_vcpu_set_result = False
1335 guest_epa_quota = {
1336 "numa-node-policy": {
1337 "node": [
1338 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1339 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1340 ],
1341 },
1342 }
1343
1344 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1345 guest_epa_quota=guest_epa_quota,
1346 )
1347
1348 self.assertEqual(expected_numa_result, numa_result)
1349 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1350
1351 def test__process_guest_epa_numa_params_with_1_node(self):
1352 expected_numa_result = [
1353 {
1354 # "id": 0,
1355 # "vcpu": [0, 1],
1356 "cores": 3,
1357 "paired_threads": 3,
1358 "paired-threads-id": [("0", "1"), ("4", "5")],
1359 "threads": 3,
1360 "memory": 2,
1361 }
1362 ]
1363 expected_epa_vcpu_set_result = True
1364 guest_epa_quota = {
1365 "numa-node-policy": {
1366 "node": [
1367 {
1368 "num-cores": 3,
1369 "paired-threads": {
1370 "num-paired-threads": "3",
1371 "paired-thread-ids": [
1372 {
1373 "thread-a": 0,
1374 "thread-b": 1,
1375 },
1376 {
1377 "thread-a": 4,
1378 "thread-b": 5,
1379 },
1380 ],
1381 },
1382 "num-threads": "3",
1383 "memory-mb": 2048,
1384 },
1385 ],
1386 },
1387 }
1388
1389 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1390 guest_epa_quota=guest_epa_quota,
1391 )
1392
1393 self.assertEqual(expected_numa_result, numa_result)
1394 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1395
1396 def test__process_guest_epa_numa_params_with_2_nodes(self):
1397 expected_numa_result = [
1398 {
1399 "cores": 3,
1400 "paired_threads": 3,
1401 "paired-threads-id": [("0", "1"), ("4", "5")],
1402 "threads": 3,
1403 "memory": 2,
1404 },
1405 {
1406 "cores": 7,
1407 "paired_threads": 7,
1408 "paired-threads-id": [("2", "3"), ("5", "6")],
1409 "threads": 4,
1410 "memory": 4,
1411 },
1412 ]
1413 expected_epa_vcpu_set_result = True
1414 guest_epa_quota = {
1415 "numa-node-policy": {
1416 "node": [
1417 {
1418 "num-cores": 3,
1419 "paired-threads": {
1420 "num-paired-threads": "3",
1421 "paired-thread-ids": [
1422 {
1423 "thread-a": 0,
1424 "thread-b": 1,
1425 },
1426 {
1427 "thread-a": 4,
1428 "thread-b": 5,
1429 },
1430 ],
1431 },
1432 "num-threads": "3",
1433 "memory-mb": 2048,
1434 },
1435 {
1436 "num-cores": 7,
1437 "paired-threads": {
1438 "num-paired-threads": "7",
1439 "paired-thread-ids": [
1440 {
1441 "thread-a": 2,
1442 "thread-b": 3,
1443 },
1444 {
1445 "thread-a": 5,
1446 "thread-b": 6,
1447 },
1448 ],
1449 },
1450 "num-threads": "4",
1451 "memory-mb": 4096,
1452 },
1453 ],
1454 },
1455 }
1456
1457 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1458 guest_epa_quota=guest_epa_quota,
1459 )
1460
1461 self.assertEqual(expected_numa_result, numa_result)
1462 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1463
1464 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self):
1465 expected_numa_result = {}
1466 expected_epa_vcpu_set_result = False
1467 guest_epa_quota = {}
1468 vcpu_count = 0
1469 epa_vcpu_set = False
1470
1471 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1472 guest_epa_quota=guest_epa_quota,
1473 vcpu_count=vcpu_count,
1474 epa_vcpu_set=epa_vcpu_set,
1475 )
1476
1477 self.assertDictEqual(expected_numa_result, numa_result)
1478 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1479
1480 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self):
1481 expected_numa_result = {}
1482 expected_epa_vcpu_set_result = False
1483 guest_epa_quota = {
1484 "no-cpu-pinning-policy": "here",
1485 }
1486 vcpu_count = 0
1487 epa_vcpu_set = False
1488
1489 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1490 guest_epa_quota=guest_epa_quota,
1491 vcpu_count=vcpu_count,
1492 epa_vcpu_set=epa_vcpu_set,
1493 )
1494
1495 self.assertDictEqual(expected_numa_result, numa_result)
1496 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1497
1498 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self):
1499 expected_numa_result = {}
1500 expected_epa_vcpu_set_result = True
1501 guest_epa_quota = {
1502 "cpu-pinning-policy": "DEDICATED",
1503 }
1504 vcpu_count = 0
1505 epa_vcpu_set = True
1506
1507 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1508 guest_epa_quota=guest_epa_quota,
1509 vcpu_count=vcpu_count,
1510 epa_vcpu_set=epa_vcpu_set,
1511 )
1512
1513 self.assertDictEqual(expected_numa_result, numa_result)
1514 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1515
1516 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self):
1517 expected_numa_result = {"threads": 3}
1518 expected_epa_vcpu_set_result = True
1519 guest_epa_quota = {
1520 "cpu-pinning-policy": "DEDICATED",
1521 "cpu-thread-pinning-policy": "PREFER",
1522 }
1523 vcpu_count = 3
1524 epa_vcpu_set = False
1525
1526 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1527 guest_epa_quota=guest_epa_quota,
1528 vcpu_count=vcpu_count,
1529 epa_vcpu_set=epa_vcpu_set,
1530 )
1531
1532 self.assertDictEqual(expected_numa_result, numa_result)
1533 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1534
1535 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self):
1536 expected_numa_result = {"cores": 3}
1537 expected_epa_vcpu_set_result = True
1538 guest_epa_quota = {
1539 "cpu-pinning-policy": "DEDICATED",
1540 "cpu-thread-pinning-policy": "ISOLATE",
1541 }
1542 vcpu_count = 3
1543 epa_vcpu_set = False
1544
1545 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1546 guest_epa_quota=guest_epa_quota,
1547 vcpu_count=vcpu_count,
1548 epa_vcpu_set=epa_vcpu_set,
1549 )
1550
1551 self.assertDictEqual(expected_numa_result, numa_result)
1552 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1553
1554 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self):
1555 expected_numa_result = {"threads": 3}
1556 expected_epa_vcpu_set_result = True
1557 guest_epa_quota = {
1558 "cpu-pinning-policy": "DEDICATED",
1559 "cpu-thread-pinning-policy": "REQUIRE",
1560 }
1561 vcpu_count = 3
1562 epa_vcpu_set = False
1563
1564 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1565 guest_epa_quota=guest_epa_quota,
1566 vcpu_count=vcpu_count,
1567 epa_vcpu_set=epa_vcpu_set,
1568 )
1569
1570 self.assertDictEqual(expected_numa_result, numa_result)
1571 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1572
1573 def test__process_guest_epa_cpu_pinning_params(self):
1574 expected_numa_result = {"threads": 3}
1575 expected_epa_vcpu_set_result = True
1576 guest_epa_quota = {
1577 "cpu-pinning-policy": "DEDICATED",
1578 }
1579 vcpu_count = 3
1580 epa_vcpu_set = False
1581
1582 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1583 guest_epa_quota=guest_epa_quota,
1584 vcpu_count=vcpu_count,
1585 epa_vcpu_set=epa_vcpu_set,
1586 )
1587
1588 self.assertDictEqual(expected_numa_result, numa_result)
1589 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1590
1591 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1592 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1593 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1594 def test__process_guest_epa_params_with_empty_params(
1595 self,
1596 guest_epa_numa_params,
1597 guest_epa_cpu_pinning_params,
1598 guest_epa_quota_params,
1599 ):
1600 expected_result = {}
1601 target_flavor = {}
1602
1603 result = Ns._process_epa_params(
1604 target_flavor=target_flavor,
1605 )
1606
1607 self.assertDictEqual(expected_result, result)
1608 self.assertFalse(guest_epa_numa_params.called)
1609 self.assertFalse(guest_epa_cpu_pinning_params.called)
1610 self.assertFalse(guest_epa_quota_params.called)
1611
1612 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1613 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1614 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1615 def test__process_guest_epa_params_with_wrong_params(
1616 self,
1617 guest_epa_numa_params,
1618 guest_epa_cpu_pinning_params,
1619 guest_epa_quota_params,
1620 ):
1621 expected_result = {}
1622 target_flavor = {
1623 "no-guest-epa": "here",
1624 }
1625
1626 result = Ns._process_epa_params(
1627 target_flavor=target_flavor,
1628 )
1629
1630 self.assertDictEqual(expected_result, result)
1631 self.assertFalse(guest_epa_numa_params.called)
1632 self.assertFalse(guest_epa_cpu_pinning_params.called)
1633 self.assertFalse(guest_epa_quota_params.called)
1634
1635 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1636 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1637 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1638 def test__process_guest_epa_params(
1639 self,
1640 guest_epa_numa_params,
1641 guest_epa_cpu_pinning_params,
1642 guest_epa_quota_params,
1643 ):
1644 expected_result = {
1645 "mem-policy": "STRICT",
1646 }
1647 target_flavor = {
1648 "guest-epa": {
1649 "vcpu-count": 1,
1650 "numa-node-policy": {
1651 "mem-policy": "STRICT",
1652 },
1653 },
1654 }
1655
1656 guest_epa_numa_params.return_value = ({}, False)
1657 guest_epa_cpu_pinning_params.return_value = ({}, False)
1658 guest_epa_quota_params.return_value = {}
1659
1660 result = Ns._process_epa_params(
1661 target_flavor=target_flavor,
1662 )
1663
1664 self.assertDictEqual(expected_result, result)
1665 self.assertTrue(guest_epa_numa_params.called)
1666 self.assertTrue(guest_epa_cpu_pinning_params.called)
1667 self.assertTrue(guest_epa_quota_params.called)
1668
1669 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1670 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1671 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1672 def test__process_guest_epa_params_with_mempage_size(
1673 self,
1674 guest_epa_numa_params,
1675 guest_epa_cpu_pinning_params,
1676 guest_epa_quota_params,
1677 ):
1678 expected_result = {
1679 "mempage-size": "1G",
1680 "mem-policy": "STRICT",
1681 }
1682 target_flavor = {
1683 "guest-epa": {
1684 "vcpu-count": 1,
1685 "mempage-size": "1G",
1686 "numa-node-policy": {
1687 "mem-policy": "STRICT",
1688 },
1689 },
1690 }
1691
1692 guest_epa_numa_params.return_value = ({}, False)
1693 guest_epa_cpu_pinning_params.return_value = ({}, False)
1694 guest_epa_quota_params.return_value = {}
1695
1696 result = Ns._process_epa_params(
1697 target_flavor=target_flavor,
1698 )
1699
1700 self.assertDictEqual(expected_result, result)
1701 self.assertTrue(guest_epa_numa_params.called)
1702 self.assertTrue(guest_epa_cpu_pinning_params.called)
1703 self.assertTrue(guest_epa_quota_params.called)
1704
1705 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1706 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1707 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1708 def test__process_guest_epa_params_with_numa(
1709 self,
1710 guest_epa_numa_params,
1711 guest_epa_cpu_pinning_params,
1712 guest_epa_quota_params,
1713 ):
1714 expected_result = {
1715 "mempage-size": "1G",
1716 "cpu-pinning-policy": "DEDICATED",
1717 "cpu-thread-pinning-policy": "PREFER",
1718 "numas": [
1719 {
1720 "cores": 3,
1721 "memory": 2,
1722 "paired-threads": 3,
1723 "paired-threads-id": [("0", "1"), ("4", "5")],
1724 "threads": 3,
1725 }
1726 ],
1727 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1728 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1729 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1730 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1731 }
1732 target_flavor = {
1733 "guest-epa": {
1734 "vcpu-count": 1,
1735 "mempage-size": "1G",
1736 "cpu-pinning-policy": "DEDICATED",
1737 "cpu-thread-pinning-policy": "PREFER",
1738 "numa-node-policy": {
1739 "node": [
1740 {
1741 "num-cores": 3,
1742 "paired-threads": {
1743 "num-paired-threads": "3",
1744 "paired-thread-ids": [
1745 {
1746 "thread-a": 0,
1747 "thread-b": 1,
1748 },
1749 {
1750 "thread-a": 4,
1751 "thread-b": 5,
1752 },
1753 ],
1754 },
1755 "num-threads": "3",
1756 "memory-mb": 2048,
1757 },
1758 ],
1759 },
1760 "cpu-quota": {
1761 "limit": "10",
1762 "reserve": "20",
1763 "shares": "30",
1764 },
1765 "mem-quota": {
1766 "limit": "10",
1767 "reserve": "20",
1768 "shares": "30",
1769 },
1770 "disk-io-quota": {
1771 "limit": "10",
1772 "reserve": "20",
1773 "shares": "30",
1774 },
1775 "vif-quota": {
1776 "limit": "10",
1777 "reserve": "20",
1778 "shares": "30",
1779 },
1780 },
1781 }
1782
1783 guest_epa_numa_params.return_value = (
1784 [
1785 {
1786 "cores": 3,
1787 "paired-threads": 3,
1788 "paired-threads-id": [("0", "1"), ("4", "5")],
1789 "threads": 3,
1790 "memory": 2,
1791 },
1792 ],
1793 True,
1794 )
1795 guest_epa_cpu_pinning_params.return_value = (
1796 {
1797 "threads": 3,
1798 },
1799 True,
1800 )
1801 guest_epa_quota_params.return_value = {
1802 "cpu-quota": {
1803 "limit": 10,
1804 "reserve": 20,
1805 "shares": 30,
1806 },
1807 "mem-quota": {
1808 "limit": 10,
1809 "reserve": 20,
1810 "shares": 30,
1811 },
1812 "disk-io-quota": {
1813 "limit": 10,
1814 "reserve": 20,
1815 "shares": 30,
1816 },
1817 "vif-quota": {
1818 "limit": 10,
1819 "reserve": 20,
1820 "shares": 30,
1821 },
1822 }
1823
1824 result = Ns._process_epa_params(
1825 target_flavor=target_flavor,
1826 )
1827 self.assertEqual(expected_result, result)
1828 self.assertTrue(guest_epa_numa_params.called)
1829 self.assertTrue(guest_epa_cpu_pinning_params.called)
1830 self.assertTrue(guest_epa_quota_params.called)
1831
1832 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1833 def test__process_flavor_params_with_empty_target_flavor(
1834 self,
1835 epa_params,
1836 ):
1837 target_flavor = {}
1838 indata = {
1839 "vnf": [
1840 {
1841 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1842 },
1843 ],
1844 }
1845 vim_info = {}
1846 target_record_id = ""
1847
1848 with self.assertRaises(KeyError):
1849 Ns._process_flavor_params(
1850 target_flavor=target_flavor,
1851 indata=indata,
1852 vim_info=vim_info,
1853 target_record_id=target_record_id,
1854 )
1855
1856 self.assertFalse(epa_params.called)
1857
1858 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1859 def test__process_flavor_params_with_wrong_target_flavor(
1860 self,
1861 epa_params,
1862 ):
1863 target_flavor = {
1864 "no-target-flavor": "here",
1865 }
1866 indata = {}
1867 vim_info = {}
1868 target_record_id = ""
1869
1870 with self.assertRaises(KeyError):
1871 Ns._process_flavor_params(
1872 target_flavor=target_flavor,
1873 indata=indata,
1874 vim_info=vim_info,
1875 target_record_id=target_record_id,
1876 )
1877
1878 self.assertFalse(epa_params.called)
1879
1880 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1881 def test__process_flavor_params_with_empty_indata(
1882 self,
1883 epa_params,
1884 ):
1885 expected_result = {
1886 "find_params": {
1887 "flavor_data": {
1888 "disk": 10,
1889 "ram": 1024,
1890 "vcpus": 2,
1891 },
1892 },
1893 "params": {
1894 "flavor_data": {
1895 "disk": 10,
1896 "name": "test",
1897 "ram": 1024,
1898 "vcpus": 2,
1899 },
1900 },
1901 }
1902 target_flavor = {
1903 "name": "test",
1904 "storage-gb": "10",
1905 "memory-mb": "1024",
1906 "vcpu-count": "2",
1907 }
1908 indata = {}
1909 vim_info = {}
1910 target_record_id = ""
1911
1912 epa_params.return_value = {}
1913
1914 result = Ns._process_flavor_params(
1915 target_flavor=target_flavor,
1916 indata=indata,
1917 vim_info=vim_info,
1918 target_record_id=target_record_id,
1919 )
1920
1921 self.assertTrue(epa_params.called)
1922 self.assertDictEqual(result, expected_result)
1923
1924 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1925 def test__process_flavor_params_with_wrong_indata(
1926 self,
1927 epa_params,
1928 ):
1929 expected_result = {
1930 "find_params": {
1931 "flavor_data": {
1932 "disk": 10,
1933 "ram": 1024,
1934 "vcpus": 2,
1935 },
1936 },
1937 "params": {
1938 "flavor_data": {
1939 "disk": 10,
1940 "name": "test",
1941 "ram": 1024,
1942 "vcpus": 2,
1943 },
1944 },
1945 }
1946 target_flavor = {
1947 "name": "test",
1948 "storage-gb": "10",
1949 "memory-mb": "1024",
1950 "vcpu-count": "2",
1951 }
1952 indata = {
1953 "no-vnf": "here",
1954 }
1955 vim_info = {}
1956 target_record_id = ""
1957
1958 epa_params.return_value = {}
1959
1960 result = Ns._process_flavor_params(
1961 target_flavor=target_flavor,
1962 indata=indata,
1963 vim_info=vim_info,
1964 target_record_id=target_record_id,
1965 )
1966
1967 self.assertTrue(epa_params.called)
1968 self.assertDictEqual(result, expected_result)
1969
1970 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1971 def test__process_flavor_params_with_ephemeral_disk(
1972 self,
1973 epa_params,
1974 ):
1975 kwargs = {
1976 "db": db,
1977 }
1978
1979 db.get_one.return_value = {
1980 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1981 "df": [
1982 {
1983 "id": "default-df",
1984 "vdu-profile": [
1985 {"id": "without_volumes-VM", "min-number-of-instances": 1}
1986 ],
1987 }
1988 ],
1989 "id": "without_volumes-vnf",
1990 "product-name": "without_volumes-vnf",
1991 "vdu": [
1992 {
1993 "id": "without_volumes-VM",
1994 "name": "without_volumes-VM",
1995 "sw-image-desc": "ubuntu20.04",
1996 "alternative-sw-image-desc": [
1997 "ubuntu20.04-aws",
1998 "ubuntu20.04-azure",
1999 ],
2000 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2001 }
2002 ],
2003 "version": "1.0",
2004 "virtual-storage-desc": [
2005 {"id": "root-volume", "size-of-storage": "10"},
2006 {
2007 "id": "ephemeral-volume",
2008 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2009 "size-of-storage": "1",
2010 },
2011 ],
2012 "_admin": {
2013 "storage": {
2014 "fs": "mongo",
2015 "path": "/app/storage/",
2016 },
2017 "type": "vnfd",
2018 },
2019 }
2020 expected_result = {
2021 "find_params": {
2022 "flavor_data": {
2023 "disk": 10,
2024 "ram": 1024,
2025 "vcpus": 2,
2026 "ephemeral": 10,
2027 },
2028 },
2029 "params": {
2030 "flavor_data": {
2031 "disk": 10,
2032 "name": "test",
2033 "ram": 1024,
2034 "vcpus": 2,
2035 "ephemeral": 10,
2036 },
2037 },
2038 }
2039 target_flavor = {
2040 "id": "test_id",
2041 "name": "test",
2042 "storage-gb": "10",
2043 "memory-mb": "1024",
2044 "vcpu-count": "2",
2045 }
2046 indata = {
2047 "vnf": [
2048 {
2049 "vdur": [
2050 {
2051 "ns-flavor-id": "test_id",
2052 "virtual-storages": [
2053 {
2054 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2055 "size-of-storage": "10",
2056 },
2057 ],
2058 },
2059 ],
2060 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2061 },
2062 ],
2063 }
2064 vim_info = {}
2065 target_record_id = ""
2066
2067 epa_params.return_value = {}
2068
2069 result = Ns._process_flavor_params(
2070 target_flavor=target_flavor,
2071 indata=indata,
2072 vim_info=vim_info,
2073 target_record_id=target_record_id,
2074 **kwargs,
2075 )
2076
2077 self.assertTrue(epa_params.called)
2078 self.assertDictEqual(result, expected_result)
2079
2080 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2081 def test__process_flavor_params_with_swap_disk(
2082 self,
2083 epa_params,
2084 ):
2085 expected_result = {
2086 "find_params": {
2087 "flavor_data": {
2088 "disk": 10,
2089 "ram": 1024,
2090 "vcpus": 2,
2091 "swap": 20,
2092 },
2093 },
2094 "params": {
2095 "flavor_data": {
2096 "disk": 10,
2097 "name": "test",
2098 "ram": 1024,
2099 "vcpus": 2,
2100 "swap": 20,
2101 },
2102 },
2103 }
2104 target_flavor = {
2105 "id": "test_id",
2106 "name": "test",
2107 "storage-gb": "10",
2108 "memory-mb": "1024",
2109 "vcpu-count": "2",
2110 }
2111 indata = {
2112 "vnf": [
2113 {
2114 "vdur": [
2115 {
2116 "ns-flavor-id": "test_id",
2117 "virtual-storages": [
2118 {
2119 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2120 "size-of-storage": "20",
2121 },
2122 ],
2123 },
2124 ],
2125 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2126 },
2127 ],
2128 }
2129 vim_info = {}
2130 target_record_id = ""
2131
2132 epa_params.return_value = {}
2133
2134 result = Ns._process_flavor_params(
2135 target_flavor=target_flavor,
2136 indata=indata,
2137 vim_info=vim_info,
2138 target_record_id=target_record_id,
2139 )
2140
2141 self.assertTrue(epa_params.called)
2142 self.assertDictEqual(result, expected_result)
2143
2144 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2145 def test__process_flavor_params_with_persistent_root_disk(
2146 self,
2147 epa_params,
2148 ):
2149 kwargs = {
2150 "db": db,
2151 }
2152
2153 db.get_one.return_value = {
2154 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2155 "df": [
2156 {
2157 "id": "default-df",
2158 "vdu-profile": [
2159 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2160 ],
2161 }
2162 ],
2163 "id": "several_volumes-vnf",
2164 "product-name": "several_volumes-vnf",
2165 "vdu": [
2166 {
2167 "id": "several_volumes-VM",
2168 "name": "several_volumes-VM",
2169 "sw-image-desc": "ubuntu20.04",
2170 "alternative-sw-image-desc": [
2171 "ubuntu20.04-aws",
2172 "ubuntu20.04-azure",
2173 ],
2174 "virtual-storage-desc": [
2175 "persistent-root-volume",
2176 ],
2177 }
2178 ],
2179 "version": "1.0",
2180 "virtual-storage-desc": [
2181 {
2182 "id": "persistent-root-volume",
2183 "type-of-storage": "persistent-storage:persistent-storage",
2184 "size-of-storage": "10",
2185 },
2186 ],
2187 "_admin": {
2188 "storage": {
2189 "fs": "mongo",
2190 "path": "/app/storage/",
2191 },
2192 "type": "vnfd",
2193 },
2194 }
2195 expected_result = {
2196 "find_params": {
2197 "flavor_data": {
2198 "disk": 0,
2199 "ram": 1024,
2200 "vcpus": 2,
2201 },
2202 },
2203 "params": {
2204 "flavor_data": {
2205 "disk": 0,
2206 "name": "test",
2207 "ram": 1024,
2208 "vcpus": 2,
2209 },
2210 },
2211 }
2212 target_flavor = {
2213 "id": "test_id",
2214 "name": "test",
2215 "storage-gb": "10",
2216 "memory-mb": "1024",
2217 "vcpu-count": "2",
2218 }
2219 indata = {
2220 "vnf": [
2221 {
2222 "vdur": [
2223 {
2224 "vdu-name": "several_volumes-VM",
2225 "ns-flavor-id": "test_id",
2226 "virtual-storages": [
2227 {
2228 "type-of-storage": "persistent-storage:persistent-storage",
2229 "size-of-storage": "10",
2230 },
2231 ],
2232 },
2233 ],
2234 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2235 },
2236 ],
2237 }
2238 vim_info = {}
2239 target_record_id = ""
2240
2241 epa_params.return_value = {}
2242
2243 result = Ns._process_flavor_params(
2244 target_flavor=target_flavor,
2245 indata=indata,
2246 vim_info=vim_info,
2247 target_record_id=target_record_id,
2248 **kwargs,
2249 )
2250
2251 self.assertTrue(epa_params.called)
2252 self.assertDictEqual(result, expected_result)
2253
2254 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2255 def test__process_flavor_params_with_epa_params(
2256 self,
2257 epa_params,
2258 ):
2259 expected_result = {
2260 "find_params": {
2261 "flavor_data": {
2262 "disk": 10,
2263 "ram": 1024,
2264 "vcpus": 2,
2265 "extended": {
2266 "numa": "there-is-numa-here",
2267 },
2268 },
2269 },
2270 "params": {
2271 "flavor_data": {
2272 "disk": 10,
2273 "name": "test",
2274 "ram": 1024,
2275 "vcpus": 2,
2276 "extended": {
2277 "numa": "there-is-numa-here",
2278 },
2279 },
2280 },
2281 }
2282 target_flavor = {
2283 "id": "test_id",
2284 "name": "test",
2285 "storage-gb": "10",
2286 "memory-mb": "1024",
2287 "vcpu-count": "2",
2288 }
2289 indata = {
2290 "vnf": [
2291 {
2292 "vdur": [
2293 {
2294 "ns-flavor-id": "test_id",
2295 },
2296 ],
2297 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2298 },
2299 ],
2300 }
2301 vim_info = {}
2302 target_record_id = ""
2303
2304 epa_params.return_value = {
2305 "numa": "there-is-numa-here",
2306 }
2307
2308 result = Ns._process_flavor_params(
2309 target_flavor=target_flavor,
2310 indata=indata,
2311 vim_info=vim_info,
2312 target_record_id=target_record_id,
2313 )
2314
2315 self.assertTrue(epa_params.called)
2316 self.assertDictEqual(result, expected_result)
2317
2318 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2319 def test__process_flavor_params(
2320 self,
2321 epa_params,
2322 ):
2323 kwargs = {
2324 "db": db,
2325 }
2326
2327 db.get_one.return_value = {
2328 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2329 "df": [
2330 {
2331 "id": "default-df",
2332 "vdu-profile": [
2333 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2334 ],
2335 }
2336 ],
2337 "id": "without_volumes-vnf",
2338 "product-name": "without_volumes-vnf",
2339 "vdu": [
2340 {
2341 "id": "without_volumes-VM",
2342 "name": "without_volumes-VM",
2343 "sw-image-desc": "ubuntu20.04",
2344 "alternative-sw-image-desc": [
2345 "ubuntu20.04-aws",
2346 "ubuntu20.04-azure",
2347 ],
2348 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2349 }
2350 ],
2351 "version": "1.0",
2352 "virtual-storage-desc": [
2353 {"id": "root-volume", "size-of-storage": "10"},
2354 {
2355 "id": "ephemeral-volume",
2356 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2357 "size-of-storage": "1",
2358 },
2359 ],
2360 "_admin": {
2361 "storage": {
2362 "fs": "mongo",
2363 "path": "/app/storage/",
2364 },
2365 "type": "vnfd",
2366 },
2367 }
2368
2369 expected_result = {
2370 "find_params": {
2371 "flavor_data": {
2372 "disk": 10,
2373 "ram": 1024,
2374 "vcpus": 2,
2375 "ephemeral": 10,
2376 "swap": 20,
2377 "extended": {
2378 "numa": "there-is-numa-here",
2379 },
2380 },
2381 },
2382 "params": {
2383 "flavor_data": {
2384 "disk": 10,
2385 "name": "test",
2386 "ram": 1024,
2387 "vcpus": 2,
2388 "ephemeral": 10,
2389 "swap": 20,
2390 "extended": {
2391 "numa": "there-is-numa-here",
2392 },
2393 },
2394 },
2395 }
2396 target_flavor = {
2397 "id": "test_id",
2398 "name": "test",
2399 "storage-gb": "10",
2400 "memory-mb": "1024",
2401 "vcpu-count": "2",
2402 }
2403 indata = {
2404 "vnf": [
2405 {
2406 "vdur": [
2407 {
2408 "ns-flavor-id": "test_id",
2409 "virtual-storages": [
2410 {
2411 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2412 "size-of-storage": "10",
2413 },
2414 {
2415 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2416 "size-of-storage": "20",
2417 },
2418 ],
2419 },
2420 ],
2421 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2422 },
2423 ],
2424 }
2425 vim_info = {}
2426 target_record_id = ""
2427
2428 epa_params.return_value = {
2429 "numa": "there-is-numa-here",
2430 }
2431
2432 result = Ns._process_flavor_params(
2433 target_flavor=target_flavor,
2434 indata=indata,
2435 vim_info=vim_info,
2436 target_record_id=target_record_id,
2437 **kwargs,
2438 )
2439
2440 self.assertTrue(epa_params.called)
2441 self.assertDictEqual(result, expected_result)
2442
2443 def test__process_net_params_with_empty_params(
2444 self,
2445 ):
2446 target_vld = {
2447 "name": "vld-name",
2448 }
2449 indata = {
2450 "name": "ns-name",
2451 }
2452 vim_info = {
2453 "provider_network": "some-profile-here",
2454 "ip_profile": {
2455 "some_ip_profile": "here",
2456 },
2457 }
2458 target_record_id = ""
2459 expected_result = {
2460 "params": {
2461 "net_name": "ns-name-vld-name",
2462 "net_type": "bridge",
2463 "ip_profile": {
2464 "some_ip_profile": "here",
2465 },
2466 "provider_network_profile": "some-profile-here",
2467 }
2468 }
2469
2470 result = Ns._process_net_params(
2471 target_vld=target_vld,
2472 indata=indata,
2473 vim_info=vim_info,
2474 target_record_id=target_record_id,
2475 )
2476
2477 self.assertDictEqual(expected_result, result)
2478
2479 def test__process_net_params_with_vim_info_sdn(
2480 self,
2481 ):
2482 target_vld = {
2483 "name": "vld-name",
2484 }
2485 indata = {
2486 "name": "ns-name",
2487 }
2488 vim_info = {
2489 "sdn": "some-sdn",
2490 "sdn-ports": ["some", "ports", "here"],
2491 "vlds": ["some", "vlds", "here"],
2492 "type": "sdn-type",
2493 }
2494 target_record_id = "vld.sdn.something"
2495 expected_result = {
2496 "params": {
2497 "sdn-ports": ["some", "ports", "here"],
2498 "vlds": ["some", "vlds", "here"],
2499 "type": "sdn-type",
2500 }
2501 }
2502
2503 result = Ns._process_net_params(
2504 target_vld=target_vld,
2505 indata=indata,
2506 vim_info=vim_info,
2507 target_record_id=target_record_id,
2508 )
2509
2510 self.assertDictEqual(expected_result, result)
2511
2512 def test__process_net_params_with_vim_info_sdn_target_vim(
2513 self,
2514 ):
2515 target_vld = {
2516 "name": "vld-name",
2517 }
2518 indata = {
2519 "name": "ns-name",
2520 }
2521 vim_info = {
2522 "sdn": "some-sdn",
2523 "sdn-ports": ["some", "ports", "here"],
2524 "vlds": ["some", "vlds", "here"],
2525 "target_vim": "some-vim",
2526 "type": "sdn-type",
2527 }
2528 target_record_id = "vld.sdn.something"
2529 expected_result = {
2530 "depends_on": ["some-vim vld.sdn"],
2531 "params": {
2532 "sdn-ports": ["some", "ports", "here"],
2533 "vlds": ["some", "vlds", "here"],
2534 "target_vim": "some-vim",
2535 "type": "sdn-type",
2536 },
2537 }
2538
2539 result = Ns._process_net_params(
2540 target_vld=target_vld,
2541 indata=indata,
2542 vim_info=vim_info,
2543 target_record_id=target_record_id,
2544 )
2545
2546 self.assertDictEqual(expected_result, result)
2547
2548 def test__process_net_params_with_vim_network_name(
2549 self,
2550 ):
2551 target_vld = {
2552 "name": "vld-name",
2553 }
2554 indata = {
2555 "name": "ns-name",
2556 }
2557 vim_info = {
2558 "vim_network_name": "some-network-name",
2559 }
2560 target_record_id = "vld.sdn.something"
2561 expected_result = {
2562 "find_params": {
2563 "filter_dict": {
2564 "name": "some-network-name",
2565 },
2566 },
2567 }
2568
2569 result = Ns._process_net_params(
2570 target_vld=target_vld,
2571 indata=indata,
2572 vim_info=vim_info,
2573 target_record_id=target_record_id,
2574 )
2575
2576 self.assertDictEqual(expected_result, result)
2577
2578 def test__process_net_params_with_vim_network_id(
2579 self,
2580 ):
2581 target_vld = {
2582 "name": "vld-name",
2583 }
2584 indata = {
2585 "name": "ns-name",
2586 }
2587 vim_info = {
2588 "vim_network_id": "some-network-id",
2589 }
2590 target_record_id = "vld.sdn.something"
2591 expected_result = {
2592 "find_params": {
2593 "filter_dict": {
2594 "id": "some-network-id",
2595 },
2596 },
2597 }
2598
2599 result = Ns._process_net_params(
2600 target_vld=target_vld,
2601 indata=indata,
2602 vim_info=vim_info,
2603 target_record_id=target_record_id,
2604 )
2605
2606 self.assertDictEqual(expected_result, result)
2607
2608 def test__process_net_params_with_mgmt_network(
2609 self,
2610 ):
2611 target_vld = {
2612 "id": "vld-id",
2613 "name": "vld-name",
2614 "mgmt-network": "some-mgmt-network",
2615 }
2616 indata = {
2617 "name": "ns-name",
2618 }
2619 vim_info = {}
2620 target_record_id = "vld.sdn.something"
2621 expected_result = {
2622 "find_params": {
2623 "mgmt": True,
2624 "name": "vld-id",
2625 },
2626 }
2627
2628 result = Ns._process_net_params(
2629 target_vld=target_vld,
2630 indata=indata,
2631 vim_info=vim_info,
2632 target_record_id=target_record_id,
2633 )
2634
2635 self.assertDictEqual(expected_result, result)
2636
2637 def test__process_net_params_with_underlay_eline(
2638 self,
2639 ):
2640 target_vld = {
2641 "name": "vld-name",
2642 "underlay": "some-underlay-here",
2643 "type": "ELINE",
2644 }
2645 indata = {
2646 "name": "ns-name",
2647 }
2648 vim_info = {
2649 "provider_network": "some-profile-here",
2650 "ip_profile": {
2651 "some_ip_profile": "here",
2652 },
2653 }
2654 target_record_id = ""
2655 expected_result = {
2656 "params": {
2657 "ip_profile": {
2658 "some_ip_profile": "here",
2659 },
2660 "net_name": "ns-name-vld-name",
2661 "net_type": "ptp",
2662 "provider_network_profile": "some-profile-here",
2663 }
2664 }
2665
2666 result = Ns._process_net_params(
2667 target_vld=target_vld,
2668 indata=indata,
2669 vim_info=vim_info,
2670 target_record_id=target_record_id,
2671 )
2672
2673 self.assertDictEqual(expected_result, result)
2674
2675 def test__process_net_params_with_underlay_elan(
2676 self,
2677 ):
2678 target_vld = {
2679 "name": "vld-name",
2680 "underlay": "some-underlay-here",
2681 "type": "ELAN",
2682 }
2683 indata = {
2684 "name": "ns-name",
2685 }
2686 vim_info = {
2687 "provider_network": "some-profile-here",
2688 "ip_profile": {
2689 "some_ip_profile": "here",
2690 },
2691 }
2692 target_record_id = ""
2693 expected_result = {
2694 "params": {
2695 "ip_profile": {
2696 "some_ip_profile": "here",
2697 },
2698 "net_name": "ns-name-vld-name",
2699 "net_type": "data",
2700 "provider_network_profile": "some-profile-here",
2701 }
2702 }
2703
2704 result = Ns._process_net_params(
2705 target_vld=target_vld,
2706 indata=indata,
2707 vim_info=vim_info,
2708 target_record_id=target_record_id,
2709 )
2710
2711 self.assertDictEqual(expected_result, result)
2712
2713 def test__get_cloud_init_exception(self):
2714 db_mock = MagicMock(name="database mock")
2715 fs_mock = None
2716
2717 location = ""
2718
2719 with self.assertRaises(NsException):
2720 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2721
2722 def test__get_cloud_init_file_fs_exception(self):
2723 db_mock = MagicMock(name="database mock")
2724 fs_mock = None
2725
2726 location = "vnfr_id_123456:file:test_file"
2727 db_mock.get_one.return_value = {
2728 "_admin": {
2729 "storage": {
2730 "folder": "/home/osm",
2731 "pkg-dir": "vnfr_test_dir",
2732 },
2733 },
2734 }
2735
2736 with self.assertRaises(NsException):
2737 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2738
2739 def test__get_cloud_init_file(self):
2740 db_mock = MagicMock(name="database mock")
2741 fs_mock = MagicMock(name="filesystem mock")
2742 file_mock = MagicMock(name="file mock")
2743
2744 location = "vnfr_id_123456:file:test_file"
2745 cloud_init_content = "this is a cloud init file content"
2746
2747 db_mock.get_one.return_value = {
2748 "_admin": {
2749 "storage": {
2750 "folder": "/home/osm",
2751 "pkg-dir": "vnfr_test_dir",
2752 },
2753 },
2754 }
2755 fs_mock.file_open.return_value = file_mock
2756 file_mock.__enter__.return_value.read.return_value = cloud_init_content
2757
2758 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2759
2760 self.assertEqual(cloud_init_content, result)
2761
2762 def test__get_cloud_init_vdu(self):
2763 db_mock = MagicMock(name="database mock")
2764 fs_mock = None
2765
2766 location = "vnfr_id_123456:vdu:0"
2767 cloud_init_content = "this is a cloud init file content"
2768
2769 db_mock.get_one.return_value = {
2770 "vdu": {
2771 0: {
2772 "cloud-init": cloud_init_content,
2773 },
2774 },
2775 }
2776
2777 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2778
2779 self.assertEqual(cloud_init_content, result)
2780
2781 @patch("jinja2.Environment.__init__")
2782 def test__parse_jinja2_undefined_error(self, env_mock: Mock):
2783 cloud_init_content = None
2784 params = None
2785 context = None
2786
2787 env_mock.side_effect = UndefinedError("UndefinedError occurred.")
2788
2789 with self.assertRaises(NsException):
2790 Ns._parse_jinja2(
2791 cloud_init_content=cloud_init_content, params=params, context=context
2792 )
2793
2794 @patch("jinja2.Environment.__init__")
2795 def test__parse_jinja2_template_error(self, env_mock: Mock):
2796 cloud_init_content = None
2797 params = None
2798 context = None
2799
2800 env_mock.side_effect = TemplateError("TemplateError occurred.")
2801
2802 with self.assertRaises(NsException):
2803 Ns._parse_jinja2(
2804 cloud_init_content=cloud_init_content, params=params, context=context
2805 )
2806
2807 @patch("jinja2.Environment.__init__")
2808 def test__parse_jinja2_template_not_found(self, env_mock: Mock):
2809 cloud_init_content = None
2810 params = None
2811 context = None
2812
2813 env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.")
2814
2815 with self.assertRaises(NsException):
2816 Ns._parse_jinja2(
2817 cloud_init_content=cloud_init_content, params=params, context=context
2818 )
2819
2820 def test_rendering_jinja2_temp_without_special_characters(self):
2821 cloud_init_content = """
2822 disk_setup:
2823 ephemeral0:
2824 table_type: {{type}}
2825 layout: True
2826 overwrite: {{is_override}}
2827 runcmd:
2828 - [ ls, -l, / ]
2829 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2830 """
2831 params = {
2832 "type": "mbr",
2833 "is_override": "False",
2834 "command": "; mkdir abc",
2835 }
2836 context = "cloud-init for VM"
2837 expected_result = """
2838 disk_setup:
2839 ephemeral0:
2840 table_type: mbr
2841 layout: True
2842 overwrite: False
2843 runcmd:
2844 - [ ls, -l, / ]
2845 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
2846 """
2847 result = Ns._parse_jinja2(
2848 cloud_init_content=cloud_init_content, params=params, context=context
2849 )
2850 self.assertEqual(result, expected_result)
2851
2852 def test_rendering_jinja2_temp_with_special_characters(self):
2853 cloud_init_content = """
2854 disk_setup:
2855 ephemeral0:
2856 table_type: {{type}}
2857 layout: True
2858 overwrite: {{is_override}}
2859 runcmd:
2860 - [ ls, -l, / ]
2861 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2862 """
2863 params = {
2864 "type": "mbr",
2865 "is_override": "False",
2866 "command": "& rm -rf",
2867 }
2868 context = "cloud-init for VM"
2869 expected_result = """
2870 disk_setup:
2871 ephemeral0:
2872 table_type: mbr
2873 layout: True
2874 overwrite: False
2875 runcmd:
2876 - [ ls, -l, / ]
2877 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
2878 """
2879 result = Ns._parse_jinja2(
2880 cloud_init_content=cloud_init_content, params=params, context=context
2881 )
2882 self.assertNotEqual(result, expected_result)
2883
2884 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self):
2885 with patch("osm_ng_ro.ns.Environment") as mock_environment:
2886 mock_environment.return_value = Environment(
2887 undefined=StrictUndefined,
2888 autoescape=select_autoescape(default_for_string=False, default=False),
2889 )
2890 cloud_init_content = """
2891 disk_setup:
2892 ephemeral0:
2893 table_type: {{type}}
2894 layout: True
2895 overwrite: {{is_override}}
2896 runcmd:
2897 - [ ls, -l, / ]
2898 - [ sh, -xc, "echo $(date) '{{command}}'" ]
2899 """
2900 params = {
2901 "type": "mbr",
2902 "is_override": "False",
2903 "command": "& rm -rf /",
2904 }
2905 context = "cloud-init for VM"
2906 expected_result = """
2907 disk_setup:
2908 ephemeral0:
2909 table_type: mbr
2910 layout: True
2911 overwrite: False
2912 runcmd:
2913 - [ ls, -l, / ]
2914 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
2915 """
2916 result = Ns._parse_jinja2(
2917 cloud_init_content=cloud_init_content,
2918 params=params,
2919 context=context,
2920 )
2921 self.assertEqual(result, expected_result)
2922
2923 @patch("osm_ng_ro.ns.Ns._assign_vim")
2924 def test__rebuild_start_stop_task(self, assign_vim):
2925 self.ns = Ns()
2926 extra_dict = {}
2927 actions = ["start", "stop", "rebuild"]
2928 vdu_id = "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
2929 vnf_id = "665b4165-ce24-4320-bf19-b9a45bade49f"
2930 vdu_index = "0"
2931 action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
2932 nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
2933 task_index = 0
2934 target_vim = "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
2935 t = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
2936 for action in actions:
2937 expected_result = {
2938 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
2939 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
2940 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
2941 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
2942 "status": "SCHEDULED",
2943 "action": "EXEC",
2944 "item": "update",
2945 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0",
2946 "target_record_id": t,
2947 "params": {
2948 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
2949 "action": action,
2950 },
2951 }
2952 extra_dict["params"] = {
2953 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
2954 "action": action,
2955 }
2956 task = self.ns.rebuild_start_stop_task(
2957 vdu_id,
2958 vnf_id,
2959 vdu_index,
2960 action_id,
2961 nsr_id,
2962 task_index,
2963 target_vim,
2964 extra_dict,
2965 )
2966 self.assertEqual(task.get("action_id"), action_id)
2967 self.assertEqual(task.get("nsr_id"), nsr_id)
2968 self.assertEqual(task.get("target_id"), target_vim)
2969 self.assertDictEqual(task, expected_result)
2970
2971 @patch("osm_ng_ro.ns.Ns._assign_vim")
2972 def test_verticalscale_task(self, assign_vim):
2973 self.ns = Ns()
2974 extra_dict = {}
2975 vdu_index = "1"
2976 action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
2977 nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
2978 task_index = 1
2979 target_record_id = (
2980 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
2981 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
2982 )
2983
2984 expected_result = {
2985 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
2986 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
2987 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
2988 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
2989 "status": "SCHEDULED",
2990 "action": "EXEC",
2991 "item": "verticalscale",
2992 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
2993 "target_record_id": target_record_id,
2994 "params": {
2995 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
2996 "flavor_dict": "flavor_dict",
2997 },
2998 }
2999 vdu = {
3000 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3001 "vim_info": {
3002 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3003 },
3004 }
3005 vnf = {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3006 extra_dict["params"] = {
3007 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3008 "flavor_dict": "flavor_dict",
3009 }
3010 task = self.ns.verticalscale_task(
3011 vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
3012 )
3013
3014 self.assertDictEqual(task, expected_result)
3015
3016 @patch("osm_ng_ro.ns.Ns._assign_vim")
3017 def test_migrate_task(self, assign_vim):
3018 self.ns = Ns()
3019 extra_dict = {}
3020 vdu_index = "1"
3021 action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
3022 nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
3023 task_index = 1
3024 target_record_id = (
3025 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3026 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3027 )
3028
3029 expected_result = {
3030 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3031 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3032 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3033 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3034 "status": "SCHEDULED",
3035 "action": "EXEC",
3036 "item": "migrate",
3037 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3038 "target_record_id": target_record_id,
3039 "params": {
3040 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3041 "migrate_host": "migrateToHost",
3042 },
3043 }
3044 vdu = {
3045 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3046 "vim_info": {
3047 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3048 },
3049 }
3050 vnf = {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3051 extra_dict["params"] = {
3052 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3053 "migrate_host": "migrateToHost",
3054 }
3055 task = self.ns.migrate_task(
3056 vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
3057 )
3058
3059 self.assertDictEqual(task, expected_result)
3060
3061
3062 class TestProcessVduParams(unittest.TestCase):
3063 def setUp(self):
3064 self.ns = Ns()
3065 self.logger = CopyingMock(autospec=True)
3066
3067 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3068 def test_find_persistent_root_volumes_empty_instantiation_vol_list(
3069 self, mock_volume_keeping_required
3070 ):
3071 """Find persistent root volume, instantiation_vol_list is empty."""
3072 vnfd = deepcopy(vnfd_wth_persistent_storage)
3073 target_vdu = target_vdu_wth_persistent_storage
3074 vdu_instantiation_volumes_list = []
3075 disk_list = []
3076 mock_volume_keeping_required.return_value = True
3077 expected_root_disk = {
3078 "id": "persistent-root-volume",
3079 "type-of-storage": "persistent-storage:persistent-storage",
3080 "size-of-storage": "10",
3081 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}],
3082 }
3083 expected_persist_root_disk = {
3084 "persistent-root-volume": {
3085 "image_id": "ubuntu20.04",
3086 "size": "10",
3087 "keep": True,
3088 }
3089 }
3090 expected_disk_list = [
3091 {
3092 "image_id": "ubuntu20.04",
3093 "size": "10",
3094 "keep": True,
3095 },
3096 ]
3097 persist_root_disk = self.ns.find_persistent_root_volumes(
3098 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3099 )
3100 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3101 mock_volume_keeping_required.assert_called_once_with(expected_root_disk)
3102 self.assertEqual(disk_list, expected_disk_list)
3103 self.assertEqual(len(disk_list), 1)
3104
3105 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3106 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(
3107 self, mock_volume_keeping_required
3108 ):
3109 """Find persistent root volume, always selects the first vsd as root volume."""
3110 vnfd = deepcopy(vnfd_wth_persistent_storage)
3111 vnfd["vdu"][0]["virtual-storage-desc"] = [
3112 "persistent-volume2",
3113 "persistent-root-volume",
3114 "ephemeral-volume",
3115 ]
3116 target_vdu = target_vdu_wth_persistent_storage
3117 vdu_instantiation_volumes_list = []
3118 disk_list = []
3119 mock_volume_keeping_required.return_value = True
3120 expected_root_disk = {
3121 "id": "persistent-volume2",
3122 "type-of-storage": "persistent-storage:persistent-storage",
3123 "size-of-storage": "10",
3124 }
3125 expected_persist_root_disk = {
3126 "persistent-volume2": {
3127 "image_id": "ubuntu20.04",
3128 "size": "10",
3129 "keep": True,
3130 }
3131 }
3132 expected_disk_list = [
3133 {
3134 "image_id": "ubuntu20.04",
3135 "size": "10",
3136 "keep": True,
3137 },
3138 ]
3139 persist_root_disk = self.ns.find_persistent_root_volumes(
3140 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3141 )
3142 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3143 mock_volume_keeping_required.assert_called_once_with(expected_root_disk)
3144 self.assertEqual(disk_list, expected_disk_list)
3145 self.assertEqual(len(disk_list), 1)
3146
3147 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3148 def test_find_persistent_root_volumes_empty_size_of_storage(
3149 self, mock_volume_keeping_required
3150 ):
3151 """Find persistent root volume, size of storage is empty."""
3152 vnfd = deepcopy(vnfd_wth_persistent_storage)
3153 vnfd["virtual-storage-desc"][0]["size-of-storage"] = ""
3154 vnfd["vdu"][0]["virtual-storage-desc"] = [
3155 "persistent-volume2",
3156 "persistent-root-volume",
3157 "ephemeral-volume",
3158 ]
3159 target_vdu = target_vdu_wth_persistent_storage
3160 vdu_instantiation_volumes_list = []
3161 disk_list = []
3162 persist_root_disk = self.ns.find_persistent_root_volumes(
3163 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3164 )
3165 self.assertEqual(persist_root_disk, None)
3166 mock_volume_keeping_required.assert_not_called()
3167 self.assertEqual(disk_list, [])
3168
3169 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3170 def test_find_persistent_root_volumes_keeping_is_not_required(
3171 self, mock_volume_keeping_required
3172 ):
3173 """Find persistent root volume, volume keeping is not required."""
3174 vnfd = deepcopy(vnfd_wth_persistent_storage)
3175 vnfd["virtual-storage-desc"][1]["vdu-storage-requirements"] = [
3176 {"key": "keep-volume", "value": "false"},
3177 ]
3178 target_vdu = target_vdu_wth_persistent_storage
3179 vdu_instantiation_volumes_list = []
3180 disk_list = []
3181 mock_volume_keeping_required.return_value = False
3182 expected_root_disk = {
3183 "id": "persistent-root-volume",
3184 "type-of-storage": "persistent-storage:persistent-storage",
3185 "size-of-storage": "10",
3186 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}],
3187 }
3188 expected_persist_root_disk = {
3189 "persistent-root-volume": {
3190 "image_id": "ubuntu20.04",
3191 "size": "10",
3192 "keep": False,
3193 }
3194 }
3195 expected_disk_list = [
3196 {
3197 "image_id": "ubuntu20.04",
3198 "size": "10",
3199 "keep": False,
3200 },
3201 ]
3202 persist_root_disk = self.ns.find_persistent_root_volumes(
3203 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3204 )
3205 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3206 mock_volume_keeping_required.assert_called_once_with(expected_root_disk)
3207 self.assertEqual(disk_list, expected_disk_list)
3208 self.assertEqual(len(disk_list), 1)
3209
3210 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3211 def test_find_persistent_root_volumes_target_vdu_mismatch(
3212 self, mock_volume_keeping_required
3213 ):
3214 """Find persistent root volume, target vdu name is not matching."""
3215 vnfd = deepcopy(vnfd_wth_persistent_storage)
3216 vnfd["vdu"][0]["name"] = "Several_Volumes-VM"
3217 target_vdu = target_vdu_wth_persistent_storage
3218 vdu_instantiation_volumes_list = []
3219 disk_list = []
3220 result = self.ns.find_persistent_root_volumes(
3221 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3222 )
3223 self.assertEqual(result, None)
3224 mock_volume_keeping_required.assert_not_called()
3225 self.assertEqual(disk_list, [])
3226 self.assertEqual(len(disk_list), 0)
3227
3228 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3229 def test_find_persistent_root_volumes_with_instantiation_vol_list(
3230 self, mock_volume_keeping_required
3231 ):
3232 """Find persistent root volume, existing volume needs to be used."""
3233 vnfd = deepcopy(vnfd_wth_persistent_storage)
3234 target_vdu = target_vdu_wth_persistent_storage
3235 vdu_instantiation_volumes_list = [
3236 {
3237 "vim-volume-id": vim_volume_id,
3238 "name": "persistent-root-volume",
3239 }
3240 ]
3241 disk_list = []
3242 expected_persist_root_disk = {
3243 "persistent-root-volume": {
3244 "vim_volume_id": vim_volume_id,
3245 "image_id": "ubuntu20.04",
3246 },
3247 }
3248 expected_disk_list = [
3249 {
3250 "vim_volume_id": vim_volume_id,
3251 "image_id": "ubuntu20.04",
3252 },
3253 ]
3254 persist_root_disk = self.ns.find_persistent_root_volumes(
3255 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3256 )
3257 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3258 mock_volume_keeping_required.assert_not_called()
3259 self.assertEqual(disk_list, expected_disk_list)
3260 self.assertEqual(len(disk_list), 1)
3261
3262 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3263 def test_find_persistent_root_volumes_invalid_instantiation_params(
3264 self, mock_volume_keeping_required
3265 ):
3266 """Find persistent root volume, existing volume id keyword is invalid."""
3267 vnfd = deepcopy(vnfd_wth_persistent_storage)
3268 target_vdu = target_vdu_wth_persistent_storage
3269 vdu_instantiation_volumes_list = [
3270 {
3271 "volume-id": vim_volume_id,
3272 "name": "persistent-root-volume",
3273 }
3274 ]
3275 disk_list = []
3276 with self.assertRaises(KeyError):
3277 self.ns.find_persistent_root_volumes(
3278 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3279 )
3280 mock_volume_keeping_required.assert_not_called()
3281 self.assertEqual(disk_list, [])
3282 self.assertEqual(len(disk_list), 0)
3283
3284 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3285 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3286 self, mock_volume_keeping_required
3287 ):
3288 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3289 persistent_root_disk = {
3290 "persistent-root-volume": {
3291 "image_id": "ubuntu20.04",
3292 "size": "10",
3293 "keep": False,
3294 }
3295 }
3296 mock_volume_keeping_required.return_value = False
3297 target_vdu = target_vdu_wth_persistent_storage
3298 vdu_instantiation_volumes_list = []
3299 disk_list = [
3300 {
3301 "image_id": "ubuntu20.04",
3302 "size": "10",
3303 "keep": False,
3304 },
3305 ]
3306 expected_disk = {
3307 "id": "persistent-volume2",
3308 "size-of-storage": "10",
3309 "type-of-storage": "persistent-storage:persistent-storage",
3310 }
3311 expected_disk_list = [
3312 {
3313 "image_id": "ubuntu20.04",
3314 "size": "10",
3315 "keep": False,
3316 },
3317 {
3318 "size": "10",
3319 "keep": False,
3320 },
3321 ]
3322 self.ns.find_persistent_volumes(
3323 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3324 )
3325 self.assertEqual(disk_list, expected_disk_list)
3326 mock_volume_keeping_required.assert_called_once_with(expected_disk)
3327
3328 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3329 def test_find_persistent_volumes_vdu_wth_inst_vol_list(
3330 self, mock_volume_keeping_required
3331 ):
3332 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3333 persistent_root_disk = {
3334 "persistent-root-volume": {
3335 "image_id": "ubuntu20.04",
3336 "size": "10",
3337 "keep": False,
3338 }
3339 }
3340 vdu_instantiation_volumes_list = [
3341 {
3342 "vim-volume-id": vim_volume_id,
3343 "name": "persistent-volume2",
3344 }
3345 ]
3346 target_vdu = target_vdu_wth_persistent_storage
3347 disk_list = [
3348 {
3349 "image_id": "ubuntu20.04",
3350 "size": "10",
3351 "keep": False,
3352 },
3353 ]
3354 expected_disk_list = [
3355 {
3356 "image_id": "ubuntu20.04",
3357 "size": "10",
3358 "keep": False,
3359 },
3360 {
3361 "vim_volume_id": vim_volume_id,
3362 },
3363 ]
3364 self.ns.find_persistent_volumes(
3365 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3366 )
3367 self.assertEqual(disk_list, expected_disk_list)
3368 mock_volume_keeping_required.assert_not_called()
3369
3370 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3371 def test_find_persistent_volumes_vdu_wthout_persistent_storage(
3372 self, mock_volume_keeping_required
3373 ):
3374 """Find persistent ordinary volume, there is not any persistent disk."""
3375 persistent_root_disk = {}
3376 vdu_instantiation_volumes_list = []
3377 mock_volume_keeping_required.return_value = False
3378 target_vdu = target_vdu_wthout_persistent_storage
3379 disk_list = []
3380 self.ns.find_persistent_volumes(
3381 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3382 )
3383 self.assertEqual(disk_list, disk_list)
3384 mock_volume_keeping_required.assert_not_called()
3385
3386 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3387 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3388 self, mock_volume_keeping_required
3389 ):
3390 """There is persistent root disk, but there is not ordinary persistent disk."""
3391 persistent_root_disk = {
3392 "persistent-root-volume": {
3393 "image_id": "ubuntu20.04",
3394 "size": "10",
3395 "keep": False,
3396 }
3397 }
3398 vdu_instantiation_volumes_list = []
3399 mock_volume_keeping_required.return_value = False
3400 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3401 target_vdu["virtual-storages"] = [
3402 {
3403 "id": "persistent-root-volume",
3404 "size-of-storage": "10",
3405 "type-of-storage": "persistent-storage:persistent-storage",
3406 "vdu-storage-requirements": [
3407 {"key": "keep-volume", "value": "true"},
3408 ],
3409 },
3410 {
3411 "id": "ephemeral-volume",
3412 "size-of-storage": "1",
3413 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3414 },
3415 ]
3416 disk_list = [
3417 {
3418 "image_id": "ubuntu20.04",
3419 "size": "10",
3420 "keep": False,
3421 },
3422 ]
3423 self.ns.find_persistent_volumes(
3424 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3425 )
3426 self.assertEqual(disk_list, disk_list)
3427 mock_volume_keeping_required.assert_not_called()
3428
3429 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3430 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(
3431 self, mock_volume_keeping_required
3432 ):
3433 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3434 vim-volume-id is given as instantiation parameter but disk id is not matching.
3435 """
3436 mock_volume_keeping_required.return_value = True
3437 vdu_instantiation_volumes_list = [
3438 {
3439 "vim-volume-id": vim_volume_id,
3440 "name": "persistent-volume3",
3441 }
3442 ]
3443 persistent_root_disk = {
3444 "persistent-root-volume": {
3445 "image_id": "ubuntu20.04",
3446 "size": "10",
3447 "keep": False,
3448 }
3449 }
3450 disk_list = [
3451 {
3452 "image_id": "ubuntu20.04",
3453 "size": "10",
3454 "keep": False,
3455 },
3456 ]
3457 expected_disk_list = [
3458 {
3459 "image_id": "ubuntu20.04",
3460 "size": "10",
3461 "keep": False,
3462 },
3463 {
3464 "size": "10",
3465 "keep": True,
3466 },
3467 ]
3468 expected_disk = {
3469 "id": "persistent-volume2",
3470 "size-of-storage": "10",
3471 "type-of-storage": "persistent-storage:persistent-storage",
3472 }
3473 target_vdu = target_vdu_wth_persistent_storage
3474 self.ns.find_persistent_volumes(
3475 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3476 )
3477 self.assertEqual(disk_list, expected_disk_list)
3478 mock_volume_keeping_required.assert_called_once_with(expected_disk)
3479
3480 def test_is_volume_keeping_required_true(self):
3481 """Volume keeping is required."""
3482 virtual_storage_descriptor = {
3483 "id": "persistent-root-volume",
3484 "type-of-storage": "persistent-storage:persistent-storage",
3485 "size-of-storage": "10",
3486 "vdu-storage-requirements": [
3487 {"key": "keep-volume", "value": "true"},
3488 ],
3489 }
3490 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3491 self.assertEqual(result, True)
3492
3493 def test_is_volume_keeping_required_false(self):
3494 """Volume keeping is not required."""
3495 virtual_storage_descriptor = {
3496 "id": "persistent-root-volume",
3497 "type-of-storage": "persistent-storage:persistent-storage",
3498 "size-of-storage": "10",
3499 "vdu-storage-requirements": [
3500 {"key": "keep-volume", "value": "false"},
3501 ],
3502 }
3503 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3504 self.assertEqual(result, False)
3505
3506 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self):
3507 """Volume keeping is not required, vdu-storage-requirements key does not exist."""
3508 virtual_storage_descriptor = {
3509 "id": "persistent-root-volume",
3510 "type-of-storage": "persistent-storage:persistent-storage",
3511 "size-of-storage": "10",
3512 }
3513 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3514 self.assertEqual(result, False)
3515
3516 def test_is_volume_keeping_required_wrong_keyword(self):
3517 """vdu-storage-requirements key to indicate keeping-volume is wrong."""
3518 virtual_storage_descriptor = {
3519 "id": "persistent-root-volume",
3520 "type-of-storage": "persistent-storage:persistent-storage",
3521 "size-of-storage": "10",
3522 "vdu-storage-requirements": [
3523 {"key": "hold-volume", "value": "true"},
3524 ],
3525 }
3526 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3527 self.assertEqual(result, False)
3528
3529 def test_sort_vdu_interfaces_position_all_wth_positions(self):
3530 """Interfaces are sorted according to position, all have positions."""
3531 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3532 target_vdu["interfaces"] = [
3533 {
3534 "name": "vdu-eth1",
3535 "ns-vld-id": "datanet",
3536 "position": 2,
3537 },
3538 {
3539 "name": "vdu-eth0",
3540 "ns-vld-id": "mgmtnet",
3541 "position": 1,
3542 },
3543 ]
3544 sorted_interfaces = [
3545 {
3546 "name": "vdu-eth0",
3547 "ns-vld-id": "mgmtnet",
3548 "position": 1,
3549 },
3550 {
3551 "name": "vdu-eth1",
3552 "ns-vld-id": "datanet",
3553 "position": 2,
3554 },
3555 ]
3556 self.ns._sort_vdu_interfaces(target_vdu)
3557 self.assertEqual(target_vdu["interfaces"], sorted_interfaces)
3558
3559 def test_sort_vdu_interfaces_position_some_wth_position(self):
3560 """Interfaces are sorted according to position, some of them have positions."""
3561 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3562 target_vdu["interfaces"] = [
3563 {
3564 "name": "vdu-eth0",
3565 "ns-vld-id": "mgmtnet",
3566 },
3567 {
3568 "name": "vdu-eth1",
3569 "ns-vld-id": "datanet",
3570 "position": 1,
3571 },
3572 ]
3573 sorted_interfaces = [
3574 {
3575 "name": "vdu-eth1",
3576 "ns-vld-id": "datanet",
3577 "position": 1,
3578 },
3579 {
3580 "name": "vdu-eth0",
3581 "ns-vld-id": "mgmtnet",
3582 },
3583 ]
3584 self.ns._sort_vdu_interfaces(target_vdu)
3585 self.assertEqual(target_vdu["interfaces"], sorted_interfaces)
3586
3587 def test_sort_vdu_interfaces_position_empty_interface_list(self):
3588 """Interface list is empty."""
3589 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3590 target_vdu["interfaces"] = []
3591 sorted_interfaces = []
3592 self.ns._sort_vdu_interfaces(target_vdu)
3593 self.assertEqual(target_vdu["interfaces"], sorted_interfaces)
3594
3595 def test_partially_locate_vdu_interfaces(self):
3596 """Some interfaces have positions."""
3597 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3598 target_vdu["interfaces"] = [
3599 {
3600 "name": "vdu-eth1",
3601 "ns-vld-id": "net1",
3602 },
3603 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3604 {
3605 "name": "vdu-eth3",
3606 "ns-vld-id": "mgmtnet",
3607 },
3608 {
3609 "name": "vdu-eth1",
3610 "ns-vld-id": "datanet",
3611 "position": 1,
3612 },
3613 ]
3614 self.ns._partially_locate_vdu_interfaces(target_vdu)
3615 self.assertDictEqual(
3616 target_vdu["interfaces"][0],
3617 {
3618 "name": "vdu-eth1",
3619 "ns-vld-id": "datanet",
3620 "position": 1,
3621 },
3622 )
3623 self.assertDictEqual(
3624 target_vdu["interfaces"][2],
3625 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3626 )
3627
3628 def test_partially_locate_vdu_interfaces_position_start_from_0(self):
3629 """Some interfaces have positions, position start from 0."""
3630 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3631 target_vdu["interfaces"] = [
3632 {
3633 "name": "vdu-eth1",
3634 "ns-vld-id": "net1",
3635 },
3636 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3637 {
3638 "name": "vdu-eth3",
3639 "ns-vld-id": "mgmtnet",
3640 },
3641 {
3642 "name": "vdu-eth1",
3643 "ns-vld-id": "datanet",
3644 "position": 0,
3645 },
3646 ]
3647 self.ns._partially_locate_vdu_interfaces(target_vdu)
3648 self.assertDictEqual(
3649 target_vdu["interfaces"][0],
3650 {
3651 "name": "vdu-eth1",
3652 "ns-vld-id": "datanet",
3653 "position": 0,
3654 },
3655 )
3656 self.assertDictEqual(
3657 target_vdu["interfaces"][3],
3658 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3659 )
3660
3661 def test_partially_locate_vdu_interfaces_wthout_position(self):
3662 """Interfaces do not have positions."""
3663 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3664 target_vdu["interfaces"] = interfaces_wthout_positions
3665 expected_result = deepcopy(target_vdu["interfaces"])
3666 self.ns._partially_locate_vdu_interfaces(target_vdu)
3667 self.assertEqual(target_vdu["interfaces"], expected_result)
3668
3669 def test_partially_locate_vdu_interfaces_all_has_position(self):
3670 """All interfaces have position."""
3671 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3672 target_vdu["interfaces"] = interfaces_wth_all_positions
3673 expected_interfaces = [
3674 {
3675 "name": "vdu-eth2",
3676 "ns-vld-id": "net2",
3677 "position": 0,
3678 },
3679 {
3680 "name": "vdu-eth3",
3681 "ns-vld-id": "mgmtnet",
3682 "position": 1,
3683 },
3684 {
3685 "name": "vdu-eth1",
3686 "ns-vld-id": "net1",
3687 "position": 2,
3688 },
3689 ]
3690 self.ns._partially_locate_vdu_interfaces(target_vdu)
3691 self.assertEqual(target_vdu["interfaces"], expected_interfaces)
3692
3693 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3694 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3695 def test_prepare_vdu_cloud_init(self, mock_parse_jinja2, mock_get_cloud_init):
3696 """Target_vdu has cloud-init and boot-data-drive."""
3697 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3698 target_vdu["cloud-init"] = "sample-cloud-init-path"
3699 target_vdu["boot-data-drive"] = "vda"
3700 vdu2cloud_init = {}
3701 mock_get_cloud_init.return_value = cloud_init_content
3702 mock_parse_jinja2.return_value = user_data
3703 expected_result = {
3704 "user-data": user_data,
3705 "boot-data-drive": "vda",
3706 }
3707 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3708 self.assertDictEqual(result, expected_result)
3709 mock_get_cloud_init.assert_called_once_with(
3710 db=db, fs=fs, location="sample-cloud-init-path"
3711 )
3712 mock_parse_jinja2.assert_called_once_with(
3713 cloud_init_content=cloud_init_content,
3714 params=None,
3715 context="sample-cloud-init-path",
3716 )
3717
3718 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3719 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3720 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3721 self, mock_parse_jinja2, mock_get_cloud_init
3722 ):
3723 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3724 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3725 target_vdu["cloud-init"] = "sample-cloud-init-path"
3726 target_vdu["boot-data-drive"] = "vda"
3727 vdu2cloud_init = {}
3728 mock_get_cloud_init.side_effect = NsException(
3729 "Mismatch descriptor for cloud init."
3730 )
3731
3732 with self.assertRaises(NsException) as err:
3733 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3734 self.assertEqual(str(err.exception), "Mismatch descriptor for cloud init.")
3735
3736 mock_get_cloud_init.assert_called_once_with(
3737 db=db, fs=fs, location="sample-cloud-init-path"
3738 )
3739 mock_parse_jinja2.assert_not_called()
3740
3741 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3742 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3743 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3744 self, mock_parse_jinja2, mock_get_cloud_init
3745 ):
3746 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3747 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3748 target_vdu["cloud-init"] = "sample-cloud-init-path"
3749 target_vdu["boot-data-drive"] = "vda"
3750 vdu2cloud_init = {}
3751 mock_get_cloud_init.return_value = cloud_init_content
3752 mock_parse_jinja2.side_effect = NsException("Error parsing cloud-init content.")
3753
3754 with self.assertRaises(NsException) as err:
3755 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3756 self.assertEqual(str(err.exception), "Error parsing cloud-init content.")
3757 mock_get_cloud_init.assert_called_once_with(
3758 db=db, fs=fs, location="sample-cloud-init-path"
3759 )
3760 mock_parse_jinja2.assert_called_once_with(
3761 cloud_init_content=cloud_init_content,
3762 params=None,
3763 context="sample-cloud-init-path",
3764 )
3765
3766 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3767 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3768 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
3769 self, mock_parse_jinja2, mock_get_cloud_init
3770 ):
3771 """Target_vdu has cloud-init but do not have boot-data-drive."""
3772 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3773 target_vdu["cloud-init"] = "sample-cloud-init-path"
3774 vdu2cloud_init = {}
3775 mock_get_cloud_init.return_value = cloud_init_content
3776 mock_parse_jinja2.return_value = user_data
3777 expected_result = {
3778 "user-data": user_data,
3779 }
3780 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3781 self.assertDictEqual(result, expected_result)
3782 mock_get_cloud_init.assert_called_once_with(
3783 db=db, fs=fs, location="sample-cloud-init-path"
3784 )
3785 mock_parse_jinja2.assert_called_once_with(
3786 cloud_init_content=cloud_init_content,
3787 params=None,
3788 context="sample-cloud-init-path",
3789 )
3790
3791 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3792 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3793 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
3794 self, mock_parse_jinja2, mock_get_cloud_init
3795 ):
3796 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
3797 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3798 target_vdu["cloud-init"] = "sample-cloud-init-path"
3799 target_vdu["boot-data-drive"] = "vda"
3800 vdu2cloud_init = {"sample-cloud-init-path": cloud_init_content}
3801 mock_parse_jinja2.return_value = user_data
3802 expected_result = {
3803 "user-data": user_data,
3804 "boot-data-drive": "vda",
3805 }
3806 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3807 self.assertDictEqual(result, expected_result)
3808 mock_get_cloud_init.assert_not_called()
3809 mock_parse_jinja2.assert_called_once_with(
3810 cloud_init_content=cloud_init_content,
3811 params=None,
3812 context="sample-cloud-init-path",
3813 )
3814
3815 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3816 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3817 def test_prepare_vdu_cloud_init_no_cloud_init(
3818 self, mock_parse_jinja2, mock_get_cloud_init
3819 ):
3820 """Target_vdu do not have cloud-init."""
3821 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3822 target_vdu["boot-data-drive"] = "vda"
3823 vdu2cloud_init = {}
3824 expected_result = {
3825 "boot-data-drive": "vda",
3826 }
3827 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3828 self.assertDictEqual(result, expected_result)
3829 mock_get_cloud_init.assert_not_called()
3830 mock_parse_jinja2.assert_not_called()
3831
3832 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self):
3833 """ns_vld and vnf_vld both exist."""
3834 interface = {
3835 "name": "vdu-eth0",
3836 "ns-vld-id": "mgmtnet",
3837 "vnf-vld-id": "mgmt_cp_int",
3838 }
3839 expected_result = f"{ns_preffix}:vld.mgmtnet"
3840 result = self.ns._check_vld_information_of_interfaces(
3841 interface, ns_preffix, vnf_preffix
3842 )
3843 self.assertEqual(result, expected_result)
3844
3845 def test_check_vld_information_of_interfaces_empty_interfaces(self):
3846 """Interface dict is empty."""
3847 interface = {}
3848 result = self.ns._check_vld_information_of_interfaces(
3849 interface, ns_preffix, vnf_preffix
3850 )
3851 self.assertEqual(result, "")
3852
3853 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self):
3854 """Interface dict has only vnf_vld."""
3855 interface = {
3856 "name": "vdu-eth0",
3857 "vnf-vld-id": "mgmt_cp_int",
3858 }
3859 expected_result = f"{vnf_preffix}:vld.mgmt_cp_int"
3860 result = self.ns._check_vld_information_of_interfaces(
3861 interface, ns_preffix, vnf_preffix
3862 )
3863 self.assertEqual(result, expected_result)
3864
3865 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
3866 self,
3867 ):
3868 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
3869 interface = {
3870 "name": "vdu-eth0",
3871 "vnf-vld-id": "mgmt_cp_int",
3872 }
3873 vnf_preffix = None
3874 with self.assertRaises(Exception) as err:
3875 self.ns._check_vld_information_of_interfaces(
3876 interface, ns_preffix, vnf_preffix
3877 )
3878 self.assertEqual(type(err), TypeError)
3879
3880 def test_prepare_interface_port_security_has_security_details(self):
3881 """Interface dict has port security details."""
3882 interface = {
3883 "name": "vdu-eth0",
3884 "ns-vld-id": "mgmtnet",
3885 "vnf-vld-id": "mgmt_cp_int",
3886 "port-security-enabled": True,
3887 "port-security-disable-strategy": "allow-address-pairs",
3888 }
3889 expected_interface = {
3890 "name": "vdu-eth0",
3891 "ns-vld-id": "mgmtnet",
3892 "vnf-vld-id": "mgmt_cp_int",
3893 "port_security": True,
3894 "port_security_disable_strategy": "allow-address-pairs",
3895 }
3896 self.ns._prepare_interface_port_security(interface)
3897 self.assertDictEqual(interface, expected_interface)
3898
3899 def test_prepare_interface_port_security_empty_interfaces(self):
3900 """Interface dict is empty."""
3901 interface = {}
3902 expected_interface = {}
3903 self.ns._prepare_interface_port_security(interface)
3904 self.assertDictEqual(interface, expected_interface)
3905
3906 def test_prepare_interface_port_security_wthout_port_security(self):
3907 """Interface dict does not have port security details."""
3908 interface = {
3909 "name": "vdu-eth0",
3910 "ns-vld-id": "mgmtnet",
3911 "vnf-vld-id": "mgmt_cp_int",
3912 }
3913 expected_interface = {
3914 "name": "vdu-eth0",
3915 "ns-vld-id": "mgmtnet",
3916 "vnf-vld-id": "mgmt_cp_int",
3917 }
3918 self.ns._prepare_interface_port_security(interface)
3919 self.assertDictEqual(interface, expected_interface)
3920
3921 def test_create_net_item_of_interface_floating_ip_port_security(self):
3922 """Interface dict has floating ip, port-security details."""
3923 interface = {
3924 "name": "vdu-eth0",
3925 "vcpi": "sample_vcpi",
3926 "port_security": True,
3927 "port_security_disable_strategy": "allow-address-pairs",
3928 "floating_ip": "10.1.1.12",
3929 "ns-vld-id": "mgmtnet",
3930 "vnf-vld-id": "mgmt_cp_int",
3931 }
3932 net_text = f"{ns_preffix}"
3933 expected_net_item = {
3934 "name": "vdu-eth0",
3935 "port_security": True,
3936 "port_security_disable_strategy": "allow-address-pairs",
3937 "floating_ip": "10.1.1.12",
3938 "net_id": f"TASK-{ns_preffix}",
3939 "type": "virtual",
3940 }
3941 result = self.ns._create_net_item_of_interface(interface, net_text)
3942 self.assertDictEqual(result, expected_net_item)
3943
3944 def test_create_net_item_of_interface_invalid_net_text(self):
3945 """net-text is invalid."""
3946 interface = {
3947 "name": "vdu-eth0",
3948 "vcpi": "sample_vcpi",
3949 "port_security": True,
3950 "port_security_disable_strategy": "allow-address-pairs",
3951 "floating_ip": "10.1.1.12",
3952 "ns-vld-id": "mgmtnet",
3953 "vnf-vld-id": "mgmt_cp_int",
3954 }
3955 net_text = None
3956 with self.assertRaises(TypeError):
3957 self.ns._create_net_item_of_interface(interface, net_text)
3958
3959 def test_create_net_item_of_interface_empty_interface(self):
3960 """Interface dict is empty."""
3961 interface = {}
3962 net_text = ns_preffix
3963 expected_net_item = {
3964 "net_id": f"TASK-{ns_preffix}",
3965 "type": "virtual",
3966 }
3967 result = self.ns._create_net_item_of_interface(interface, net_text)
3968 self.assertDictEqual(result, expected_net_item)
3969
3970 @patch("osm_ng_ro.ns.deep_get")
3971 def test_prepare_type_of_interface_type_sriov(self, mock_deep_get):
3972 """Interface type is SR-IOV."""
3973 interface = {
3974 "name": "vdu-eth0",
3975 "vcpi": "sample_vcpi",
3976 "port_security": True,
3977 "port_security_disable_strategy": "allow-address-pairs",
3978 "floating_ip": "10.1.1.12",
3979 "ns-vld-id": "mgmtnet",
3980 "vnf-vld-id": "mgmt_cp_int",
3981 "type": "SR-IOV",
3982 }
3983 mock_deep_get.return_value = "SR-IOV"
3984 net_text = ns_preffix
3985 net_item = {}
3986 expected_net_item = {
3987 "use": "data",
3988 "model": "SR-IOV",
3989 "type": "SR-IOV",
3990 }
3991 self.ns._prepare_type_of_interface(
3992 interface, tasks_by_target_record_id, net_text, net_item
3993 )
3994 self.assertDictEqual(net_item, expected_net_item)
3995 self.assertEqual(
3996 "data",
3997 tasks_by_target_record_id[net_text]["extra_dict"]["params"]["net_type"],
3998 )
3999 mock_deep_get.assert_called_once_with(
4000 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type"
4001 )
4002
4003 @patch("osm_ng_ro.ns.deep_get")
4004 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
4005 self, mock_deep_get
4006 ):
4007 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
4008 interface = {
4009 "name": "vdu-eth0",
4010 "vcpi": "sample_vcpi",
4011 "port_security": True,
4012 "port_security_disable_strategy": "allow-address-pairs",
4013 "floating_ip": "10.1.1.12",
4014 "ns-vld-id": "mgmtnet",
4015 "vnf-vld-id": "mgmt_cp_int",
4016 "type": "PCI-PASSTHROUGH",
4017 }
4018 mock_deep_get.return_value = {}
4019 tasks_by_target_record_id = {}
4020 net_text = ns_preffix
4021 net_item = {}
4022 expected_net_item = {
4023 "use": "data",
4024 "model": "PCI-PASSTHROUGH",
4025 "type": "PCI-PASSTHROUGH",
4026 }
4027 self.ns._prepare_type_of_interface(
4028 interface, tasks_by_target_record_id, net_text, net_item
4029 )
4030 self.assertDictEqual(net_item, expected_net_item)
4031 mock_deep_get.assert_called_once_with(
4032 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type"
4033 )
4034
4035 @patch("osm_ng_ro.ns.deep_get")
4036 def test_prepare_type_of_interface_type_mgmt(self, mock_deep_get):
4037 """Interface type is mgmt."""
4038 interface = {
4039 "name": "vdu-eth0",
4040 "vcpi": "sample_vcpi",
4041 "port_security": True,
4042 "port_security_disable_strategy": "allow-address-pairs",
4043 "floating_ip": "10.1.1.12",
4044 "ns-vld-id": "mgmtnet",
4045 "vnf-vld-id": "mgmt_cp_int",
4046 "type": "OM-MGMT",
4047 }
4048 tasks_by_target_record_id = {}
4049 net_text = ns_preffix
4050 net_item = {}
4051 expected_net_item = {
4052 "use": "mgmt",
4053 }
4054 self.ns._prepare_type_of_interface(
4055 interface, tasks_by_target_record_id, net_text, net_item
4056 )
4057 self.assertDictEqual(net_item, expected_net_item)
4058 mock_deep_get.assert_not_called()
4059
4060 @patch("osm_ng_ro.ns.deep_get")
4061 def test_prepare_type_of_interface_type_bridge(self, mock_deep_get):
4062 """Interface type is bridge."""
4063 interface = {
4064 "name": "vdu-eth0",
4065 "vcpi": "sample_vcpi",
4066 "port_security": True,
4067 "port_security_disable_strategy": "allow-address-pairs",
4068 "floating_ip": "10.1.1.12",
4069 "ns-vld-id": "mgmtnet",
4070 "vnf-vld-id": "mgmt_cp_int",
4071 }
4072 tasks_by_target_record_id = {}
4073 net_text = ns_preffix
4074 net_item = {}
4075 expected_net_item = {
4076 "use": "bridge",
4077 "model": None,
4078 }
4079 self.ns._prepare_type_of_interface(
4080 interface, tasks_by_target_record_id, net_text, net_item
4081 )
4082 self.assertDictEqual(net_item, expected_net_item)
4083 mock_deep_get.assert_not_called()
4084
4085 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4086 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4087 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4088 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4089 def test_prepare_vdu_interfaces(
4090 self,
4091 mock_type_of_interface,
4092 mock_item_of_interface,
4093 mock_port_security,
4094 mock_vld_information_of_interface,
4095 ):
4096 """Prepare vdu interfaces successfully."""
4097 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4098 interface_1 = {
4099 "name": "vdu-eth1",
4100 "ns-vld-id": "net1",
4101 "ip-address": "13.2.12.31",
4102 "mgmt-interface": True,
4103 }
4104 interface_2 = {
4105 "name": "vdu-eth2",
4106 "vnf-vld-id": "net2",
4107 "mac-address": "d0:94:66:ed:fc:e2",
4108 }
4109 interface_3 = {
4110 "name": "vdu-eth3",
4111 "ns-vld-id": "mgmtnet",
4112 }
4113 target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
4114 extra_dict = {
4115 "params": "test_params",
4116 "find_params": "test_find_params",
4117 "depends_on": [],
4118 }
4119
4120 net_text_1 = f"{ns_preffix}:net1"
4121 net_text_2 = f"{vnf_preffix}:net2"
4122 net_text_3 = f"{ns_preffix}:mgmtnet"
4123 net_item_1 = {
4124 "name": "vdu-eth1",
4125 "net_id": f"TASK-{ns_preffix}",
4126 "type": "virtual",
4127 }
4128 net_item_2 = {
4129 "name": "vdu-eth2",
4130 "net_id": f"TASK-{ns_preffix}",
4131 "type": "virtual",
4132 }
4133 net_item_3 = {
4134 "name": "vdu-eth3",
4135 "net_id": f"TASK-{ns_preffix}",
4136 "type": "virtual",
4137 }
4138 mock_item_of_interface.side_effect = [net_item_1, net_item_2, net_item_3]
4139 mock_vld_information_of_interface.side_effect = [
4140 net_text_1,
4141 net_text_2,
4142 net_text_3,
4143 ]
4144 net_list = []
4145 expected_extra_dict = {
4146 "params": "test_params",
4147 "find_params": "test_find_params",
4148 "depends_on": [net_text_1, net_text_2, net_text_3],
4149 "mgmt_vdu_interface": 0,
4150 }
4151 updated_net_item1 = deepcopy(net_item_1)
4152 updated_net_item1.update({"ip_address": "13.2.12.31"})
4153 updated_net_item2 = deepcopy(net_item_2)
4154 updated_net_item2.update({"mac_address": "d0:94:66:ed:fc:e2"})
4155 expected_net_list = [updated_net_item1, updated_net_item2, net_item_3]
4156 self.ns._prepare_vdu_interfaces(
4157 target_vdu,
4158 extra_dict,
4159 ns_preffix,
4160 vnf_preffix,
4161 self.logger,
4162 tasks_by_target_record_id,
4163 net_list,
4164 )
4165 _call_mock_vld_information_of_interface = (
4166 mock_vld_information_of_interface.call_args_list
4167 )
4168 self.assertEqual(
4169 _call_mock_vld_information_of_interface[0][0],
4170 (interface_1, ns_preffix, vnf_preffix),
4171 )
4172 self.assertEqual(
4173 _call_mock_vld_information_of_interface[1][0],
4174 (interface_2, ns_preffix, vnf_preffix),
4175 )
4176 self.assertEqual(
4177 _call_mock_vld_information_of_interface[2][0],
4178 (interface_3, ns_preffix, vnf_preffix),
4179 )
4180
4181 _call_mock_port_security = mock_port_security.call_args_list
4182 self.assertEqual(_call_mock_port_security[0].args[0], interface_1)
4183 self.assertEqual(_call_mock_port_security[1].args[0], interface_2)
4184 self.assertEqual(_call_mock_port_security[2].args[0], interface_3)
4185
4186 _call_mock_item_of_interface = mock_item_of_interface.call_args_list
4187 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1))
4188 self.assertEqual(_call_mock_item_of_interface[1][0], (interface_2, net_text_2))
4189 self.assertEqual(_call_mock_item_of_interface[2][0], (interface_3, net_text_3))
4190
4191 _call_mock_type_of_interface = mock_type_of_interface.call_args_list
4192 self.assertEqual(
4193 _call_mock_type_of_interface[0][0],
4194 (interface_1, tasks_by_target_record_id, net_text_1, net_item_1),
4195 )
4196 self.assertEqual(
4197 _call_mock_type_of_interface[1][0],
4198 (interface_2, tasks_by_target_record_id, net_text_2, net_item_2),
4199 )
4200 self.assertEqual(
4201 _call_mock_type_of_interface[2][0],
4202 (interface_3, tasks_by_target_record_id, net_text_3, net_item_3),
4203 )
4204 self.assertEqual(net_list, expected_net_list)
4205 self.assertEqual(extra_dict, expected_extra_dict)
4206 self.logger.error.assert_not_called()
4207
4208 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4209 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4210 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4211 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4212 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4213 self,
4214 mock_type_of_interface,
4215 mock_item_of_interface,
4216 mock_port_security,
4217 mock_vld_information_of_interface,
4218 ):
4219 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4220 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4221 interface_1 = {
4222 "name": "vdu-eth1",
4223 "ns-vld-id": "net1",
4224 "ip-address": "13.2.12.31",
4225 "mgmt-interface": True,
4226 }
4227 interface_2 = {
4228 "name": "vdu-eth2",
4229 "vnf-vld-id": "net2",
4230 "mac-address": "d0:94:66:ed:fc:e2",
4231 }
4232 interface_3 = {
4233 "name": "vdu-eth3",
4234 "ns-vld-id": "mgmtnet",
4235 }
4236 target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
4237 extra_dict = {
4238 "params": "test_params",
4239 "find_params": "test_find_params",
4240 "depends_on": [],
4241 }
4242 net_text_1 = f"{ns_preffix}:net1"
4243 mock_item_of_interface.side_effect = [TypeError, TypeError, TypeError]
4244
4245 mock_vld_information_of_interface.side_effect = [net_text_1]
4246 net_list = []
4247 expected_extra_dict = {
4248 "params": "test_params",
4249 "find_params": "test_find_params",
4250 "depends_on": [net_text_1],
4251 }
4252 with self.assertRaises(TypeError):
4253 self.ns._prepare_vdu_interfaces(
4254 target_vdu,
4255 extra_dict,
4256 ns_preffix,
4257 vnf_preffix,
4258 self.logger,
4259 tasks_by_target_record_id,
4260 net_list,
4261 )
4262
4263 _call_mock_vld_information_of_interface = (
4264 mock_vld_information_of_interface.call_args_list
4265 )
4266 self.assertEqual(
4267 _call_mock_vld_information_of_interface[0][0],
4268 (interface_1, ns_preffix, vnf_preffix),
4269 )
4270
4271 _call_mock_port_security = mock_port_security.call_args_list
4272 self.assertEqual(_call_mock_port_security[0].args[0], interface_1)
4273
4274 _call_mock_item_of_interface = mock_item_of_interface.call_args_list
4275 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1))
4276
4277 mock_type_of_interface.assert_not_called()
4278 self.logger.error.assert_not_called()
4279 self.assertEqual(net_list, [])
4280 self.assertEqual(extra_dict, expected_extra_dict)
4281
4282 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4283 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4284 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4285 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4286 def test_prepare_vdu_interfaces_vld_information_is_empty(
4287 self,
4288 mock_type_of_interface,
4289 mock_item_of_interface,
4290 mock_port_security,
4291 mock_vld_information_of_interface,
4292 ):
4293 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4294 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4295 interface_1 = {
4296 "name": "vdu-eth1",
4297 "ns-vld-id": "net1",
4298 "ip-address": "13.2.12.31",
4299 "mgmt-interface": True,
4300 }
4301 interface_2 = {
4302 "name": "vdu-eth2",
4303 "vnf-vld-id": "net2",
4304 "mac-address": "d0:94:66:ed:fc:e2",
4305 }
4306 interface_3 = {
4307 "name": "vdu-eth3",
4308 "ns-vld-id": "mgmtnet",
4309 }
4310 target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
4311 extra_dict = {
4312 "params": "test_params",
4313 "find_params": "test_find_params",
4314 "depends_on": [],
4315 }
4316 mock_vld_information_of_interface.side_effect = ["", "", ""]
4317 net_list = []
4318 self.ns._prepare_vdu_interfaces(
4319 target_vdu,
4320 extra_dict,
4321 ns_preffix,
4322 vnf_preffix,
4323 self.logger,
4324 tasks_by_target_record_id,
4325 net_list,
4326 )
4327
4328 _call_mock_vld_information_of_interface = (
4329 mock_vld_information_of_interface.call_args_list
4330 )
4331 self.assertEqual(
4332 _call_mock_vld_information_of_interface[0][0],
4333 (interface_1, ns_preffix, vnf_preffix),
4334 )
4335 self.assertEqual(
4336 _call_mock_vld_information_of_interface[1][0],
4337 (interface_2, ns_preffix, vnf_preffix),
4338 )
4339 self.assertEqual(
4340 _call_mock_vld_information_of_interface[2][0],
4341 (interface_3, ns_preffix, vnf_preffix),
4342 )
4343
4344 _call_logger = self.logger.error.call_args_list
4345 self.assertEqual(
4346 _call_logger[0][0],
4347 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4348 )
4349 self.assertEqual(
4350 _call_logger[1][0],
4351 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4352 )
4353 self.assertEqual(
4354 _call_logger[2][0],
4355 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4356 )
4357 self.assertEqual(net_list, [])
4358 self.assertEqual(
4359 extra_dict,
4360 {
4361 "params": "test_params",
4362 "find_params": "test_find_params",
4363 "depends_on": [],
4364 },
4365 )
4366
4367 mock_item_of_interface.assert_not_called()
4368 mock_port_security.assert_not_called()
4369 mock_type_of_interface.assert_not_called()
4370
4371 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4372 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4373 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4374 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4375 def test_prepare_vdu_interfaces_empty_interface_list(
4376 self,
4377 mock_type_of_interface,
4378 mock_item_of_interface,
4379 mock_port_security,
4380 mock_vld_information_of_interface,
4381 ):
4382 """Prepare vdu interfaces, interface list is empty."""
4383 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4384 target_vdu["interfaces"] = []
4385 extra_dict = {}
4386 net_list = []
4387 self.ns._prepare_vdu_interfaces(
4388 target_vdu,
4389 extra_dict,
4390 ns_preffix,
4391 vnf_preffix,
4392 self.logger,
4393 tasks_by_target_record_id,
4394 net_list,
4395 )
4396 mock_type_of_interface.assert_not_called()
4397 mock_vld_information_of_interface.assert_not_called()
4398 mock_item_of_interface.assert_not_called()
4399 mock_port_security.assert_not_called()
4400
4401 def test_prepare_vdu_ssh_keys(self):
4402 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4403 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4404 target_vdu["ssh-keys"] = ["sample-ssh-key"]
4405 ro_nsr_public_key = {"public_key": "path_of_public_key"}
4406 target_vdu["ssh-access-required"] = True
4407 cloud_config = {}
4408 expected_cloud_config = {
4409 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4410 }
4411 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config)
4412 self.assertDictEqual(cloud_config, expected_cloud_config)
4413
4414 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self):
4415 """Target_vdu does not have ssh-keys."""
4416 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4417 ro_nsr_public_key = {"public_key": "path_of_public_key"}
4418 target_vdu["ssh-access-required"] = True
4419 cloud_config = {}
4420 expected_cloud_config = {"key-pairs": [{"public_key": "path_of_public_key"}]}
4421 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config)
4422 self.assertDictEqual(cloud_config, expected_cloud_config)
4423
4424 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self):
4425 """Target_vdu has ssh-keys, ssh-access is not required."""
4426 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4427 target_vdu["ssh-keys"] = ["sample-ssh-key"]
4428 ro_nsr_public_key = {"public_key": "path_of_public_key"}
4429 target_vdu["ssh-access-required"] = False
4430 cloud_config = {}
4431 expected_cloud_config = {"key-pairs": ["sample-ssh-key"]}
4432 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config)
4433 self.assertDictEqual(cloud_config, expected_cloud_config)
4434
4435 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4436 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4437 def test_add_persistent_root_disk_to_disk_list_keep_false(
4438 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4439 ):
4440 """Add persistent root disk to disk_list, keep volume set to False."""
4441 root_disk = {
4442 "id": "persistent-root-volume",
4443 "type-of-storage": "persistent-storage:persistent-storage",
4444 "size-of-storage": "10",
4445 }
4446 mock_select_persistent_root_disk.return_value = root_disk
4447 vnfd = deepcopy(vnfd_wth_persistent_storage)
4448 vnfd["virtual-storage-desc"][1] = root_disk
4449 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4450 persistent_root_disk = {}
4451 disk_list = []
4452 mock_volume_keeping_required.return_value = False
4453 expected_disk_list = [
4454 {
4455 "image_id": "ubuntu20.04",
4456 "size": "10",
4457 "keep": False,
4458 }
4459 ]
4460 self.ns._add_persistent_root_disk_to_disk_list(
4461 vnfd, target_vdu, persistent_root_disk, disk_list
4462 )
4463 self.assertEqual(disk_list, expected_disk_list)
4464 mock_select_persistent_root_disk.assert_called_once()
4465 mock_volume_keeping_required.assert_called_once()
4466
4467 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4468 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4469 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4470 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4471 ):
4472 """Add persistent root disk to disk_list"""
4473 root_disk = {
4474 "id": "persistent-root-volume",
4475 "type-of-storage": "persistent-storage:persistent-storage",
4476 "size-of-storage": "10",
4477 }
4478 mock_select_persistent_root_disk.side_effect = AttributeError
4479 vnfd = deepcopy(vnfd_wth_persistent_storage)
4480 vnfd["virtual-storage-desc"][1] = root_disk
4481 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4482 persistent_root_disk = {}
4483 disk_list = []
4484 with self.assertRaises(AttributeError):
4485 self.ns._add_persistent_root_disk_to_disk_list(
4486 vnfd, target_vdu, persistent_root_disk, disk_list
4487 )
4488 self.assertEqual(disk_list, [])
4489 mock_select_persistent_root_disk.assert_called_once()
4490 mock_volume_keeping_required.assert_not_called()
4491
4492 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4493 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4494 def test_add_persistent_root_disk_to_disk_list_keep_true(
4495 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4496 ):
4497 """Add persistent root disk, keeo volume set to True."""
4498 vnfd = deepcopy(vnfd_wth_persistent_storage)
4499 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4500 mock_volume_keeping_required.return_value = True
4501 root_disk = {
4502 "id": "persistent-root-volume",
4503 "type-of-storage": "persistent-storage:persistent-storage",
4504 "size-of-storage": "10",
4505 "vdu-storage-requirements": [
4506 {"key": "keep-volume", "value": "true"},
4507 ],
4508 }
4509 mock_select_persistent_root_disk.return_value = root_disk
4510 persistent_root_disk = {}
4511 disk_list = []
4512 expected_disk_list = [
4513 {
4514 "image_id": "ubuntu20.04",
4515 "size": "10",
4516 "keep": True,
4517 }
4518 ]
4519 self.ns._add_persistent_root_disk_to_disk_list(
4520 vnfd, target_vdu, persistent_root_disk, disk_list
4521 )
4522 self.assertEqual(disk_list, expected_disk_list)
4523 mock_volume_keeping_required.assert_called_once_with(root_disk)
4524
4525 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4526 def test_add_persistent_ordinary_disk_to_disk_list(
4527 self, mock_volume_keeping_required
4528 ):
4529 """Add persistent ordinary disk, keeo volume set to True."""
4530 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4531 mock_volume_keeping_required.return_value = False
4532 persistent_root_disk = {
4533 "persistent-root-volume": {
4534 "image_id": "ubuntu20.04",
4535 "size": "10",
4536 "keep": True,
4537 }
4538 }
4539 ordinary_disk = {
4540 "id": "persistent-volume2",
4541 "type-of-storage": "persistent-storage:persistent-storage",
4542 "size-of-storage": "10",
4543 }
4544 persistent_ordinary_disk = {}
4545 disk_list = []
4546 expected_disk_list = [
4547 {
4548 "size": "10",
4549 "keep": False,
4550 "multiattach": False,
4551 "name": "persistent-volume2",
4552 }
4553 ]
4554 self.ns._add_persistent_ordinary_disks_to_disk_list(
4555 target_vdu, persistent_root_disk, persistent_ordinary_disk, disk_list
4556 )
4557 self.assertEqual(disk_list, expected_disk_list)
4558 mock_volume_keeping_required.assert_called_once_with(ordinary_disk)
4559
4560 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4561 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(
4562 self, mock_volume_keeping_required
4563 ):
4564 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4565 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4566 mock_volume_keeping_required.return_value = False
4567 persistent_root_disk = {
4568 "persistent-root-volume": {
4569 "image_id": "ubuntu20.04",
4570 "size": "10",
4571 "keep": True,
4572 },
4573 "persistent-volume2": {
4574 "size": "10",
4575 },
4576 }
4577 persistent_ordinary_disk = {}
4578 disk_list = []
4579
4580 self.ns._add_persistent_ordinary_disks_to_disk_list(
4581 target_vdu, persistent_root_disk, persistent_ordinary_disk, disk_list
4582 )
4583 self.assertEqual(disk_list, [])
4584 mock_volume_keeping_required.assert_not_called()
4585
4586 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4587 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4588 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4589 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4590 ):
4591 """VNFD does not have persistent storage."""
4592 vnfd = deepcopy(vnfd_wthout_persistent_storage)
4593 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4594 mock_select_persistent_root_disk.return_value = None
4595 persistent_root_disk = {}
4596 disk_list = []
4597 self.ns._add_persistent_root_disk_to_disk_list(
4598 vnfd, target_vdu, persistent_root_disk, disk_list
4599 )
4600 self.assertEqual(disk_list, [])
4601 self.assertEqual(mock_select_persistent_root_disk.call_count, 2)
4602 mock_volume_keeping_required.assert_not_called()
4603
4604 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4605 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4606 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4607 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4608 ):
4609 """Persistent_root_disk dict is empty."""
4610 vnfd = deepcopy(vnfd_wthout_persistent_storage)
4611 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4612 mock_select_persistent_root_disk.return_value = None
4613 persistent_root_disk = {}
4614 disk_list = []
4615 self.ns._add_persistent_root_disk_to_disk_list(
4616 vnfd, target_vdu, persistent_root_disk, disk_list
4617 )
4618 self.assertEqual(disk_list, [])
4619 self.assertEqual(mock_select_persistent_root_disk.call_count, 2)
4620 mock_volume_keeping_required.assert_not_called()
4621
4622 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self):
4623 """Invalid extra dict."""
4624 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4625 target_vdu["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4626 extra_dict = {}
4627 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4628 with self.assertRaises(NsException) as err:
4629 self.ns._prepare_vdu_affinity_group_list(target_vdu, extra_dict, ns_preffix)
4630 self.assertEqual(str(err.exception), "Invalid extra_dict format.")
4631
4632 def test_prepare_vdu_affinity_group_list_one_affinity_group(self):
4633 """There is one affinity-group."""
4634 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4635 target_vdu["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4636 extra_dict = {"depends_on": []}
4637 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4638 affinity_group_txt = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4639 expected_result = [{"affinity_group_id": "TASK-" + affinity_group_txt}]
4640 expected_extra_dict = {"depends_on": [affinity_group_txt]}
4641 result = self.ns._prepare_vdu_affinity_group_list(
4642 target_vdu, extra_dict, ns_preffix
4643 )
4644 self.assertDictEqual(extra_dict, expected_extra_dict)
4645 self.assertEqual(result, expected_result)
4646
4647 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self):
4648 """There are two affinity-groups."""
4649 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4650 target_vdu["affinity-or-anti-affinity-group-id"] = [
4651 "affinity-group-id1",
4652 "affinity-group-id2",
4653 ]
4654 extra_dict = {"depends_on": []}
4655 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4656 affinity_group_txt1 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4657 affinity_group_txt2 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4658 expected_result = [
4659 {"affinity_group_id": "TASK-" + affinity_group_txt1},
4660 {"affinity_group_id": "TASK-" + affinity_group_txt2},
4661 ]
4662 expected_extra_dict = {"depends_on": [affinity_group_txt1, affinity_group_txt2]}
4663 result = self.ns._prepare_vdu_affinity_group_list(
4664 target_vdu, extra_dict, ns_preffix
4665 )
4666 self.assertDictEqual(extra_dict, expected_extra_dict)
4667 self.assertEqual(result, expected_result)
4668
4669 def test_prepare_vdu_affinity_group_list_no_affinity_group(self):
4670 """There is not any affinity-group."""
4671 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4672 extra_dict = {"depends_on": []}
4673 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4674 result = self.ns._prepare_vdu_affinity_group_list(
4675 target_vdu, extra_dict, ns_preffix
4676 )
4677 self.assertDictEqual(extra_dict, {"depends_on": []})
4678 self.assertEqual(result, [])
4679
4680 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4681 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4682 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4683 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4684 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4685 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4686 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4687 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4688 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4689 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4690 def test_process_vdu_params_with_inst_vol_list(
4691 self,
4692 mock_prepare_vdu_affinity_group_list,
4693 mock_add_persistent_ordinary_disks_to_disk_list,
4694 mock_add_persistent_root_disk_to_disk_list,
4695 mock_find_persistent_volumes,
4696 mock_find_persistent_root_volumes,
4697 mock_prepare_vdu_ssh_keys,
4698 mock_prepare_vdu_cloud_init,
4699 mock_prepare_vdu_interfaces,
4700 mock_locate_vdu_interfaces,
4701 mock_sort_vdu_interfaces,
4702 ):
4703 """Instantiation volume list is empty."""
4704 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4705
4706 target_vdu["interfaces"] = interfaces_wth_all_positions
4707
4708 vdu_instantiation_vol_list = [
4709 {
4710 "vim-volume-id": vim_volume_id,
4711 "name": "persistent-volume2",
4712 }
4713 ]
4714 target_vdu["additionalParams"] = {
4715 "OSM": {"vdu_volumes": vdu_instantiation_vol_list}
4716 }
4717 mock_prepare_vdu_cloud_init.return_value = {}
4718 mock_prepare_vdu_affinity_group_list.return_value = []
4719 persistent_root_disk = {
4720 "persistent-root-volume": {
4721 "image_id": "ubuntu20.04",
4722 "size": "10",
4723 }
4724 }
4725 mock_find_persistent_root_volumes.return_value = persistent_root_disk
4726
4727 new_kwargs = deepcopy(kwargs)
4728 new_kwargs.update(
4729 {
4730 "vnfr_id": vnfr_id,
4731 "nsr_id": nsr_id,
4732 "tasks_by_target_record_id": {},
4733 "logger": "logger",
4734 }
4735 )
4736 expected_extra_dict_copy = deepcopy(expected_extra_dict)
4737 vnfd = deepcopy(vnfd_wth_persistent_storage)
4738 db.get_one.return_value = vnfd
4739 result = Ns._process_vdu_params(
4740 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
4741 )
4742 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
4743 mock_locate_vdu_interfaces.assert_not_called()
4744 mock_prepare_vdu_cloud_init.assert_called_once()
4745 mock_add_persistent_root_disk_to_disk_list.assert_not_called()
4746 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
4747 mock_prepare_vdu_interfaces.assert_called_once_with(
4748 target_vdu,
4749 expected_extra_dict_copy,
4750 ns_preffix,
4751 vnf_preffix,
4752 "logger",
4753 {},
4754 [],
4755 )
4756 self.assertDictEqual(result, expected_extra_dict_copy)
4757 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
4758 mock_prepare_vdu_affinity_group_list.assert_called_once()
4759 mock_find_persistent_volumes.assert_called_once_with(
4760 persistent_root_disk, target_vdu, vdu_instantiation_vol_list, []
4761 )
4762
4763 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4764 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4765 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4766 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4767 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4768 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4769 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4770 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4771 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4772 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4773 def test_process_vdu_params_with_inst_flavor_id(
4774 self,
4775 mock_prepare_vdu_affinity_group_list,
4776 mock_add_persistent_ordinary_disks_to_disk_list,
4777 mock_add_persistent_root_disk_to_disk_list,
4778 mock_find_persistent_volumes,
4779 mock_find_persistent_root_volumes,
4780 mock_prepare_vdu_ssh_keys,
4781 mock_prepare_vdu_cloud_init,
4782 mock_prepare_vdu_interfaces,
4783 mock_locate_vdu_interfaces,
4784 mock_sort_vdu_interfaces,
4785 ):
4786 """Instantiation volume list is empty."""
4787 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4788
4789 target_vdu["interfaces"] = interfaces_wth_all_positions
4790
4791 vdu_instantiation_flavor_id = "flavor_test"
4792
4793 target_vdu["additionalParams"] = {
4794 "OSM": {"vim_flavor_id": vdu_instantiation_flavor_id}
4795 }
4796 mock_prepare_vdu_cloud_init.return_value = {}
4797 mock_prepare_vdu_affinity_group_list.return_value = []
4798
4799 new_kwargs = deepcopy(kwargs)
4800 new_kwargs.update(
4801 {
4802 "vnfr_id": vnfr_id,
4803 "nsr_id": nsr_id,
4804 "tasks_by_target_record_id": {},
4805 "logger": "logger",
4806 }
4807 )
4808 expected_extra_dict_copy = deepcopy(expected_extra_dict3)
4809 vnfd = deepcopy(vnfd_wth_persistent_storage)
4810 db.get_one.return_value = vnfd
4811 result = Ns._process_vdu_params(
4812 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
4813 )
4814 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
4815 mock_locate_vdu_interfaces.assert_not_called()
4816 mock_prepare_vdu_cloud_init.assert_called_once()
4817 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
4818 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
4819 mock_prepare_vdu_interfaces.assert_called_once_with(
4820 target_vdu,
4821 expected_extra_dict_copy,
4822 ns_preffix,
4823 vnf_preffix,
4824 "logger",
4825 {},
4826 [],
4827 )
4828 self.assertDictEqual(result, expected_extra_dict_copy)
4829 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
4830 mock_prepare_vdu_affinity_group_list.assert_called_once()
4831 mock_find_persistent_volumes.assert_not_called()
4832
4833 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4834 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4835 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4836 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4837 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4838 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4839 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4840 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4841 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4842 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4843 def test_process_vdu_params_wth_affinity_groups(
4844 self,
4845 mock_prepare_vdu_affinity_group_list,
4846 mock_add_persistent_ordinary_disks_to_disk_list,
4847 mock_add_persistent_root_disk_to_disk_list,
4848 mock_find_persistent_volumes,
4849 mock_find_persistent_root_volumes,
4850 mock_prepare_vdu_ssh_keys,
4851 mock_prepare_vdu_cloud_init,
4852 mock_prepare_vdu_interfaces,
4853 mock_locate_vdu_interfaces,
4854 mock_sort_vdu_interfaces,
4855 ):
4856 """There is cloud-config."""
4857 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4858
4859 self.maxDiff = None
4860 target_vdu["interfaces"] = interfaces_wth_all_positions
4861 mock_prepare_vdu_cloud_init.return_value = {}
4862 mock_prepare_vdu_affinity_group_list.return_value = [
4863 "affinity_group_1",
4864 "affinity_group_2",
4865 ]
4866
4867 new_kwargs = deepcopy(kwargs)
4868 new_kwargs.update(
4869 {
4870 "vnfr_id": vnfr_id,
4871 "nsr_id": nsr_id,
4872 "tasks_by_target_record_id": {},
4873 "logger": "logger",
4874 }
4875 )
4876 expected_extra_dict3 = deepcopy(expected_extra_dict2)
4877 expected_extra_dict3["params"]["affinity_group_list"] = [
4878 "affinity_group_1",
4879 "affinity_group_2",
4880 ]
4881 vnfd = deepcopy(vnfd_wth_persistent_storage)
4882 db.get_one.return_value = vnfd
4883 result = Ns._process_vdu_params(
4884 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
4885 )
4886 self.assertDictEqual(result, expected_extra_dict3)
4887 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
4888 mock_locate_vdu_interfaces.assert_not_called()
4889 mock_prepare_vdu_cloud_init.assert_called_once()
4890 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
4891 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
4892 mock_prepare_vdu_interfaces.assert_called_once_with(
4893 target_vdu,
4894 expected_extra_dict3,
4895 ns_preffix,
4896 vnf_preffix,
4897 "logger",
4898 {},
4899 [],
4900 )
4901
4902 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
4903 mock_prepare_vdu_affinity_group_list.assert_called_once()
4904 mock_find_persistent_volumes.assert_not_called()
4905
4906 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4907 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4908 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4909 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4910 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4911 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4912 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4913 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4914 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4915 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4916 def test_process_vdu_params_wth_cloud_config(
4917 self,
4918 mock_prepare_vdu_affinity_group_list,
4919 mock_add_persistent_ordinary_disks_to_disk_list,
4920 mock_add_persistent_root_disk_to_disk_list,
4921 mock_find_persistent_volumes,
4922 mock_find_persistent_root_volumes,
4923 mock_prepare_vdu_ssh_keys,
4924 mock_prepare_vdu_cloud_init,
4925 mock_prepare_vdu_interfaces,
4926 mock_locate_vdu_interfaces,
4927 mock_sort_vdu_interfaces,
4928 ):
4929 """There is cloud-config."""
4930 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4931
4932 self.maxDiff = None
4933 target_vdu["interfaces"] = interfaces_wth_all_positions
4934 mock_prepare_vdu_cloud_init.return_value = {
4935 "user-data": user_data,
4936 "boot-data-drive": "vda",
4937 }
4938 mock_prepare_vdu_affinity_group_list.return_value = []
4939
4940 new_kwargs = deepcopy(kwargs)
4941 new_kwargs.update(
4942 {
4943 "vnfr_id": vnfr_id,
4944 "nsr_id": nsr_id,
4945 "tasks_by_target_record_id": {},
4946 "logger": "logger",
4947 }
4948 )
4949 expected_extra_dict3 = deepcopy(expected_extra_dict2)
4950 expected_extra_dict3["params"]["cloud_config"] = {
4951 "user-data": user_data,
4952 "boot-data-drive": "vda",
4953 }
4954 vnfd = deepcopy(vnfd_wth_persistent_storage)
4955 db.get_one.return_value = vnfd
4956 result = Ns._process_vdu_params(
4957 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
4958 )
4959 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
4960 mock_locate_vdu_interfaces.assert_not_called()
4961 mock_prepare_vdu_cloud_init.assert_called_once()
4962 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
4963 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
4964 mock_prepare_vdu_interfaces.assert_called_once_with(
4965 target_vdu,
4966 expected_extra_dict3,
4967 ns_preffix,
4968 vnf_preffix,
4969 "logger",
4970 {},
4971 [],
4972 )
4973 self.assertDictEqual(result, expected_extra_dict3)
4974 mock_prepare_vdu_ssh_keys.assert_called_once_with(
4975 target_vdu, None, {"user-data": user_data, "boot-data-drive": "vda"}
4976 )
4977 mock_prepare_vdu_affinity_group_list.assert_called_once()
4978 mock_find_persistent_volumes.assert_not_called()
4979
4980 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4981 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4982 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4983 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4984 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4985 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4986 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4987 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4988 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4989 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4990 def test_process_vdu_params_wthout_persistent_storage(
4991 self,
4992 mock_prepare_vdu_affinity_group_list,
4993 mock_add_persistent_ordinary_disks_to_disk_list,
4994 mock_add_persistent_root_disk_to_disk_list,
4995 mock_find_persistent_volumes,
4996 mock_find_persistent_root_volumes,
4997 mock_prepare_vdu_ssh_keys,
4998 mock_prepare_vdu_cloud_init,
4999 mock_prepare_vdu_interfaces,
5000 mock_locate_vdu_interfaces,
5001 mock_sort_vdu_interfaces,
5002 ):
5003 """There is not any persistent storage."""
5004 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
5005
5006 self.maxDiff = None
5007 target_vdu["interfaces"] = interfaces_wth_all_positions
5008 mock_prepare_vdu_cloud_init.return_value = {}
5009 mock_prepare_vdu_affinity_group_list.return_value = []
5010
5011 new_kwargs = deepcopy(kwargs)
5012 new_kwargs.update(
5013 {
5014 "vnfr_id": vnfr_id,
5015 "nsr_id": nsr_id,
5016 "tasks_by_target_record_id": {},
5017 "logger": "logger",
5018 }
5019 )
5020 expected_extra_dict_copy = deepcopy(expected_extra_dict2)
5021 vnfd = deepcopy(vnfd_wthout_persistent_storage)
5022 db.get_one.return_value = vnfd
5023 result = Ns._process_vdu_params(
5024 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5025 )
5026 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
5027 mock_locate_vdu_interfaces.assert_not_called()
5028 mock_prepare_vdu_cloud_init.assert_called_once()
5029 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5030 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
5031 mock_prepare_vdu_interfaces.assert_called_once_with(
5032 target_vdu,
5033 expected_extra_dict_copy,
5034 ns_preffix,
5035 vnf_preffix,
5036 "logger",
5037 {},
5038 [],
5039 )
5040 self.assertDictEqual(result, expected_extra_dict_copy)
5041 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5042 mock_prepare_vdu_affinity_group_list.assert_called_once()
5043 mock_find_persistent_volumes.assert_not_called()
5044
5045 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5046 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5047 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5048 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5049 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5050 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5051 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5052 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5053 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5054 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5055 def test_process_vdu_params_interfaces_partially_located(
5056 self,
5057 mock_prepare_vdu_affinity_group_list,
5058 mock_add_persistent_ordinary_disks_to_disk_list,
5059 mock_add_persistent_root_disk_to_disk_list,
5060 mock_find_persistent_volumes,
5061 mock_find_persistent_root_volumes,
5062 mock_prepare_vdu_ssh_keys,
5063 mock_prepare_vdu_cloud_init,
5064 mock_prepare_vdu_interfaces,
5065 mock_locate_vdu_interfaces,
5066 mock_sort_vdu_interfaces,
5067 ):
5068 """Some interfaces have position."""
5069 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5070
5071 self.maxDiff = None
5072 target_vdu["interfaces"] = [
5073 {
5074 "name": "vdu-eth1",
5075 "ns-vld-id": "net1",
5076 },
5077 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
5078 {
5079 "name": "vdu-eth3",
5080 "ns-vld-id": "mgmtnet",
5081 },
5082 ]
5083 mock_prepare_vdu_cloud_init.return_value = {}
5084 mock_prepare_vdu_affinity_group_list.return_value = []
5085 persistent_root_disk = {
5086 "persistent-root-volume": {
5087 "image_id": "ubuntu20.04",
5088 "size": "10",
5089 "keep": True,
5090 }
5091 }
5092 mock_find_persistent_root_volumes.return_value = persistent_root_disk
5093
5094 new_kwargs = deepcopy(kwargs)
5095 new_kwargs.update(
5096 {
5097 "vnfr_id": vnfr_id,
5098 "nsr_id": nsr_id,
5099 "tasks_by_target_record_id": {},
5100 "logger": "logger",
5101 }
5102 )
5103
5104 vnfd = deepcopy(vnfd_wth_persistent_storage)
5105 db.get_one.return_value = vnfd
5106 result = Ns._process_vdu_params(
5107 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5108 )
5109 expected_extra_dict_copy = deepcopy(expected_extra_dict)
5110 mock_sort_vdu_interfaces.assert_not_called()
5111 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5112 mock_prepare_vdu_cloud_init.assert_called_once()
5113 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5114 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
5115 mock_prepare_vdu_interfaces.assert_called_once_with(
5116 target_vdu,
5117 expected_extra_dict_copy,
5118 ns_preffix,
5119 vnf_preffix,
5120 "logger",
5121 {},
5122 [],
5123 )
5124 self.assertDictEqual(result, expected_extra_dict_copy)
5125 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5126 mock_prepare_vdu_affinity_group_list.assert_called_once()
5127 mock_find_persistent_volumes.assert_not_called()
5128 mock_find_persistent_root_volumes.assert_not_called()
5129
5130 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5131 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5132 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5133 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5134 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5135 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5136 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5137 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5138 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5139 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5140 def test_process_vdu_params_no_interface_position(
5141 self,
5142 mock_prepare_vdu_affinity_group_list,
5143 mock_add_persistent_ordinary_disks_to_disk_list,
5144 mock_add_persistent_root_disk_to_disk_list,
5145 mock_find_persistent_volumes,
5146 mock_find_persistent_root_volumes,
5147 mock_prepare_vdu_ssh_keys,
5148 mock_prepare_vdu_cloud_init,
5149 mock_prepare_vdu_interfaces,
5150 mock_locate_vdu_interfaces,
5151 mock_sort_vdu_interfaces,
5152 ):
5153 """Interfaces do not have position."""
5154 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5155
5156 self.maxDiff = None
5157 target_vdu["interfaces"] = interfaces_wthout_positions
5158 mock_prepare_vdu_cloud_init.return_value = {}
5159 mock_prepare_vdu_affinity_group_list.return_value = []
5160 persistent_root_disk = {
5161 "persistent-root-volume": {
5162 "image_id": "ubuntu20.04",
5163 "size": "10",
5164 "keep": True,
5165 }
5166 }
5167 mock_find_persistent_root_volumes.return_value = persistent_root_disk
5168 new_kwargs = deepcopy(kwargs)
5169 new_kwargs.update(
5170 {
5171 "vnfr_id": vnfr_id,
5172 "nsr_id": nsr_id,
5173 "tasks_by_target_record_id": {},
5174 "logger": "logger",
5175 }
5176 )
5177
5178 vnfd = deepcopy(vnfd_wth_persistent_storage)
5179 db.get_one.return_value = vnfd
5180 result = Ns._process_vdu_params(
5181 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5182 )
5183 expected_extra_dict_copy = deepcopy(expected_extra_dict)
5184 mock_sort_vdu_interfaces.assert_not_called()
5185 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5186 mock_prepare_vdu_cloud_init.assert_called_once()
5187 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5188 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
5189 mock_prepare_vdu_interfaces.assert_called_once_with(
5190 target_vdu,
5191 expected_extra_dict_copy,
5192 ns_preffix,
5193 vnf_preffix,
5194 "logger",
5195 {},
5196 [],
5197 )
5198 self.assertDictEqual(result, expected_extra_dict_copy)
5199 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5200 mock_prepare_vdu_affinity_group_list.assert_called_once()
5201 mock_find_persistent_volumes.assert_not_called()
5202 mock_find_persistent_root_volumes.assert_not_called()
5203
5204 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5205 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5206 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5207 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5208 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5209 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5210 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5211 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5212 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5213 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5214 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5215 self,
5216 mock_prepare_vdu_affinity_group_list,
5217 mock_add_persistent_ordinary_disks_to_disk_list,
5218 mock_add_persistent_root_disk_to_disk_list,
5219 mock_find_persistent_volumes,
5220 mock_find_persistent_root_volumes,
5221 mock_prepare_vdu_ssh_keys,
5222 mock_prepare_vdu_cloud_init,
5223 mock_prepare_vdu_interfaces,
5224 mock_locate_vdu_interfaces,
5225 mock_sort_vdu_interfaces,
5226 ):
5227 """Prepare vdu interfaces method raises exception."""
5228 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5229
5230 self.maxDiff = None
5231 target_vdu["interfaces"] = interfaces_wthout_positions
5232 mock_prepare_vdu_cloud_init.return_value = {}
5233 mock_prepare_vdu_affinity_group_list.return_value = []
5234 persistent_root_disk = {
5235 "persistent-root-volume": {
5236 "image_id": "ubuntu20.04",
5237 "size": "10",
5238 "keep": True,
5239 }
5240 }
5241 mock_find_persistent_root_volumes.return_value = persistent_root_disk
5242 new_kwargs = deepcopy(kwargs)
5243 new_kwargs.update(
5244 {
5245 "vnfr_id": vnfr_id,
5246 "nsr_id": nsr_id,
5247 "tasks_by_target_record_id": {},
5248 "logger": "logger",
5249 }
5250 )
5251 mock_prepare_vdu_interfaces.side_effect = TypeError
5252
5253 vnfd = deepcopy(vnfd_wth_persistent_storage)
5254 db.get_one.return_value = vnfd
5255 with self.assertRaises(Exception) as err:
5256 Ns._process_vdu_params(
5257 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5258 )
5259 self.assertEqual(type(err), TypeError)
5260 mock_sort_vdu_interfaces.assert_not_called()
5261 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5262 mock_prepare_vdu_cloud_init.assert_not_called()
5263 mock_add_persistent_root_disk_to_disk_list.assert_not_called()
5264 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
5265 mock_prepare_vdu_interfaces.assert_called_once()
5266 mock_prepare_vdu_ssh_keys.assert_not_called()
5267 mock_prepare_vdu_affinity_group_list.assert_not_called()
5268 mock_find_persistent_volumes.assert_not_called()
5269 mock_find_persistent_root_volumes.assert_not_called()
5270
5271 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5272 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5273 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5274 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5275 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5276 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5277 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5278 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5279 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5280 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5281 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5282 self,
5283 mock_prepare_vdu_affinity_group_list,
5284 mock_add_persistent_ordinary_disks_to_disk_list,
5285 mock_add_persistent_root_disk_to_disk_list,
5286 mock_find_persistent_volumes,
5287 mock_find_persistent_root_volumes,
5288 mock_prepare_vdu_ssh_keys,
5289 mock_prepare_vdu_cloud_init,
5290 mock_prepare_vdu_interfaces,
5291 mock_locate_vdu_interfaces,
5292 mock_sort_vdu_interfaces,
5293 ):
5294 """Add persistent root disk method raises exception."""
5295 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5296
5297 self.maxDiff = None
5298 target_vdu["interfaces"] = interfaces_wthout_positions
5299 mock_prepare_vdu_cloud_init.return_value = {}
5300 mock_prepare_vdu_affinity_group_list.return_value = []
5301 mock_add_persistent_root_disk_to_disk_list.side_effect = KeyError
5302 new_kwargs = deepcopy(kwargs)
5303 new_kwargs.update(
5304 {
5305 "vnfr_id": vnfr_id,
5306 "nsr_id": nsr_id,
5307 "tasks_by_target_record_id": {},
5308 "logger": "logger",
5309 }
5310 )
5311
5312 vnfd = deepcopy(vnfd_wth_persistent_storage)
5313 db.get_one.return_value = vnfd
5314 with self.assertRaises(Exception) as err:
5315 Ns._process_vdu_params(
5316 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5317 )
5318 self.assertEqual(type(err), KeyError)
5319 mock_sort_vdu_interfaces.assert_not_called()
5320 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5321 mock_prepare_vdu_cloud_init.assert_called_once()
5322 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5323 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
5324 mock_prepare_vdu_interfaces.assert_called_once_with(
5325 target_vdu,
5326 {
5327 "depends_on": [
5328 f"{ns_preffix}:image.0",
5329 f"{ns_preffix}:flavor.0",
5330 ]
5331 },
5332 ns_preffix,
5333 vnf_preffix,
5334 "logger",
5335 {},
5336 [],
5337 )
5338
5339 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5340 mock_prepare_vdu_affinity_group_list.assert_not_called()
5341 mock_find_persistent_volumes.assert_not_called()
5342 mock_find_persistent_root_volumes.assert_not_called()
5343
5344 def test_select_persistent_root_disk(self):
5345 vdu = deepcopy(target_vdu_wth_persistent_storage)
5346 vdu["virtual-storage-desc"] = [
5347 "persistent-root-volume",
5348 "persistent-volume2",
5349 "ephemeral-volume",
5350 ]
5351 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5352 expected_result = vsd
5353 result = Ns._select_persistent_root_disk(vsd, vdu)
5354 self.assertEqual(result, expected_result)
5355
5356 def test_select_persistent_root_disk_first_vsd_is_different(self):
5357 """VDU first virtual-storage-desc is different than vsd id."""
5358 vdu = deepcopy(target_vdu_wth_persistent_storage)
5359 vdu["virtual-storage-desc"] = [
5360 "persistent-volume2",
5361 "persistent-root-volume",
5362 "ephemeral-volume",
5363 ]
5364 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5365 expected_result = None
5366 result = Ns._select_persistent_root_disk(vsd, vdu)
5367 self.assertEqual(result, expected_result)
5368
5369 def test_select_persistent_root_disk_vsd_is_not_persistent(self):
5370 """vsd type is not persistent."""
5371 vdu = deepcopy(target_vdu_wth_persistent_storage)
5372 vdu["virtual-storage-desc"] = [
5373 "persistent-volume2",
5374 "persistent-root-volume",
5375 "ephemeral-volume",
5376 ]
5377 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5378 vsd["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5379 expected_result = None
5380 result = Ns._select_persistent_root_disk(vsd, vdu)
5381 self.assertEqual(result, expected_result)
5382
5383 def test_select_persistent_root_disk_vsd_does_not_have_size(self):
5384 """vsd size is None."""
5385 vdu = deepcopy(target_vdu_wth_persistent_storage)
5386 vdu["virtual-storage-desc"] = [
5387 "persistent-volume2",
5388 "persistent-root-volume",
5389 "ephemeral-volume",
5390 ]
5391 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5392 vsd["size-of-storage"] = None
5393 expected_result = None
5394 result = Ns._select_persistent_root_disk(vsd, vdu)
5395 self.assertEqual(result, expected_result)
5396
5397 def test_select_persistent_root_disk_vdu_wthout_vsd(self):
5398 """VDU does not have virtual-storage-desc."""
5399 vdu = deepcopy(target_vdu_wth_persistent_storage)
5400 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5401 expected_result = None
5402 result = Ns._select_persistent_root_disk(vsd, vdu)
5403 self.assertEqual(result, expected_result)
5404
5405 def test_select_persistent_root_disk_invalid_vsd_type(self):
5406 """vsd is list, expected to be a dict."""
5407 vdu = deepcopy(target_vdu_wth_persistent_storage)
5408 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"]
5409 with self.assertRaises(AttributeError):
5410 Ns._select_persistent_root_disk(vsd, vdu)