Reformat files according to new black validation
[osm/RO.git] / NG-RO / osm_ng_ro / tests / test_ns.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from copy import deepcopy
18 import unittest
19 from unittest.mock import MagicMock, Mock, patch
20
21 from jinja2 import (
22 Environment,
23 select_autoescape,
24 StrictUndefined,
25 TemplateError,
26 TemplateNotFound,
27 UndefinedError,
28 )
29 from osm_ng_ro.ns import Ns, NsException
30
31
32 __author__ = "Eduardo Sousa"
33 __date__ = "$19-NOV-2021 00:00:00$"
34
35
36 # Variables used in Tests
37 vnfd_wth_persistent_storage = {
38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
39 "df": [
40 {
41 "id": "default-df",
42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}],
43 }
44 ],
45 "id": "several_volumes-vnf",
46 "product-name": "several_volumes-vnf",
47 "vdu": [
48 {
49 "id": "several_volumes-VM",
50 "name": "several_volumes-VM",
51 "sw-image-desc": "ubuntu20.04",
52 "alternative-sw-image-desc": [
53 "ubuntu20.04-aws",
54 "ubuntu20.04-azure",
55 ],
56 "virtual-storage-desc": [
57 "persistent-root-volume",
58 "persistent-volume2",
59 "ephemeral-volume",
60 ],
61 }
62 ],
63 "version": "1.0",
64 "virtual-storage-desc": [
65 {
66 "id": "persistent-volume2",
67 "type-of-storage": "persistent-storage:persistent-storage",
68 "size-of-storage": "10",
69 },
70 {
71 "id": "persistent-root-volume",
72 "type-of-storage": "persistent-storage:persistent-storage",
73 "size-of-storage": "10",
74 "vdu-storage-requirements": [
75 {"key": "keep-volume", "value": "true"},
76 ],
77 },
78 {
79 "id": "ephemeral-volume",
80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
81 "size-of-storage": "1",
82 },
83 ],
84 "_admin": {
85 "storage": {
86 "fs": "mongo",
87 "path": "/app/storage/",
88 },
89 "type": "vnfd",
90 },
91 }
92 vim_volume_id = "ru937f49-3870-4169-b758-9732e1ff40f3"
93 task_by_target_record_id = {
94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
95 "extra_dict": {"params": {"net_type": "SR-IOV"}}
96 }
97 }
98 interfaces_wthout_positions = [
99 {
100 "name": "vdu-eth1",
101 "ns-vld-id": "net1",
102 },
103 {
104 "name": "vdu-eth2",
105 "ns-vld-id": "net2",
106 },
107 {
108 "name": "vdu-eth3",
109 "ns-vld-id": "mgmtnet",
110 },
111 ]
112 interfaces_wth_all_positions = [
113 {
114 "name": "vdu-eth1",
115 "ns-vld-id": "net1",
116 "position": 2,
117 },
118 {
119 "name": "vdu-eth2",
120 "ns-vld-id": "net2",
121 "position": 0,
122 },
123 {
124 "name": "vdu-eth3",
125 "ns-vld-id": "mgmtnet",
126 "position": 1,
127 },
128 ]
129 target_vdu_wth_persistent_storage = {
130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
131 "ns-flavor-id": "0",
132 "ns-image-id": "0",
133 "vdu-name": "several_volumes-VM",
134 "interfaces": [
135 {
136 "name": "vdu-eth0",
137 "ns-vld-id": "mgmtnet",
138 }
139 ],
140 "virtual-storages": [
141 {
142 "id": "persistent-volume2",
143 "size-of-storage": "10",
144 "type-of-storage": "persistent-storage:persistent-storage",
145 },
146 {
147 "id": "persistent-root-volume",
148 "size-of-storage": "10",
149 "type-of-storage": "persistent-storage:persistent-storage",
150 "vdu-storage-requirements": [
151 {"key": "keep-volume", "value": "true"},
152 ],
153 },
154 {
155 "id": "ephemeral-volume",
156 "size-of-storage": "1",
157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
158 },
159 ],
160 }
161 db = MagicMock(name="database mock")
162 fs = MagicMock(name="database mock")
163 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
164 vnf_preffix = "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5"
165 vnfr_id = "wh47f48-y870-4169-b758-5732e1ff40f5"
166 nsr_id = "th47f48-9870-4169-b758-9732e1ff40f3"
167 indata = {
168 "name": "sample_name",
169 }
170 expected_extra_dict = {
171 "depends_on": [
172 f"{ns_preffix}:image.0",
173 f"{ns_preffix}:flavor.0",
174 ],
175 "params": {
176 "affinity_group_list": [],
177 "availability_zone_index": None,
178 "availability_zone_list": None,
179 "cloud_config": None,
180 "description": "several_volumes-VM",
181 "disk_list": [],
182 "flavor_id": f"TASK-{ns_preffix}:flavor.0",
183 "image_id": f"TASK-{ns_preffix}:image.0",
184 "name": "sample_name-vnf-several-volu-several_volumes-VM-0",
185 "net_list": [],
186 "start": True,
187 },
188 }
189
190 expected_extra_dict2 = {
191 "depends_on": [
192 f"{ns_preffix}:image.0",
193 f"{ns_preffix}:flavor.0",
194 ],
195 "params": {
196 "affinity_group_list": [],
197 "availability_zone_index": None,
198 "availability_zone_list": None,
199 "cloud_config": None,
200 "description": "without_volumes-VM",
201 "disk_list": [],
202 "flavor_id": f"TASK-{ns_preffix}:flavor.0",
203 "image_id": f"TASK-{ns_preffix}:image.0",
204 "name": "sample_name-vnf-several-volu-without_volumes-VM-0",
205 "net_list": [],
206 "start": True,
207 },
208 }
209 tasks_by_target_record_id = {
210 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": {
211 "extra_dict": {
212 "params": {
213 "net_type": "SR-IOV",
214 }
215 }
216 }
217 }
218 kwargs = {
219 "db": MagicMock(),
220 "vdu2cloud_init": {},
221 "vnfr": {
222 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
223 "member-vnf-index-ref": "vnf-several-volumes",
224 },
225 }
226 vnfd_wthout_persistent_storage = {
227 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
228 "df": [
229 {
230 "id": "default-df",
231 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}],
232 }
233 ],
234 "id": "without_volumes-vnf",
235 "product-name": "without_volumes-vnf",
236 "vdu": [
237 {
238 "id": "without_volumes-VM",
239 "name": "without_volumes-VM",
240 "sw-image-desc": "ubuntu20.04",
241 "alternative-sw-image-desc": [
242 "ubuntu20.04-aws",
243 "ubuntu20.04-azure",
244 ],
245 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
246 }
247 ],
248 "version": "1.0",
249 "virtual-storage-desc": [
250 {"id": "root-volume", "size-of-storage": "10"},
251 {
252 "id": "ephemeral-volume",
253 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
254 "size-of-storage": "1",
255 },
256 ],
257 "_admin": {
258 "storage": {
259 "fs": "mongo",
260 "path": "/app/storage/",
261 },
262 "type": "vnfd",
263 },
264 }
265
266 target_vdu_wthout_persistent_storage = {
267 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960",
268 "ns-flavor-id": "0",
269 "ns-image-id": "0",
270 "vdu-name": "without_volumes-VM",
271 "interfaces": [
272 {
273 "name": "vdu-eth0",
274 "ns-vld-id": "mgmtnet",
275 }
276 ],
277 "virtual-storages": [
278 {
279 "id": "root-volume",
280 "size-of-storage": "10",
281 },
282 {
283 "id": "ephemeral-volume",
284 "size-of-storage": "1",
285 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
286 },
287 ],
288 }
289 cloud_init_content = """
290 disk_setup:
291 ephemeral0:
292 table_type: {{type}}
293 layout: True
294 overwrite: {{is_override}}
295 runcmd:
296 - [ ls, -l, / ]
297 - [ sh, -xc, "echo $(date) '{{command}}'" ]
298 """
299
300 user_data = """
301 disk_setup:
302 ephemeral0:
303 table_type: mbr
304 layout: True
305 overwrite: False
306 runcmd:
307 - [ ls, -l, / ]
308 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
309 """
310
311
312 class CopyingMock(MagicMock):
313 def __call__(self, *args, **kwargs):
314 args = deepcopy(args)
315 kwargs = deepcopy(kwargs)
316 return super(CopyingMock, self).__call__(*args, **kwargs)
317
318
319 class TestNs(unittest.TestCase):
320 def setUp(self):
321 pass
322
323 def test__create_task_without_extra_dict(self):
324 expected_result = {
325 "target_id": "vim_openstack_1",
326 "action_id": "123456",
327 "nsr_id": "654321",
328 "task_id": "123456:1",
329 "status": "SCHEDULED",
330 "action": "CREATE",
331 "item": "test_item",
332 "target_record": "test_target_record",
333 "target_record_id": "test_target_record_id",
334 }
335 deployment_info = {
336 "action_id": "123456",
337 "nsr_id": "654321",
338 "task_index": 1,
339 }
340
341 task = Ns._create_task(
342 deployment_info=deployment_info,
343 target_id="vim_openstack_1",
344 item="test_item",
345 action="CREATE",
346 target_record="test_target_record",
347 target_record_id="test_target_record_id",
348 )
349
350 self.assertEqual(deployment_info.get("task_index"), 2)
351 self.assertDictEqual(task, expected_result)
352
353 def test__create_task(self):
354 expected_result = {
355 "target_id": "vim_openstack_1",
356 "action_id": "123456",
357 "nsr_id": "654321",
358 "task_id": "123456:1",
359 "status": "SCHEDULED",
360 "action": "CREATE",
361 "item": "test_item",
362 "target_record": "test_target_record",
363 "target_record_id": "test_target_record_id",
364 # values coming from extra_dict
365 "params": "test_params",
366 "find_params": "test_find_params",
367 "depends_on": "test_depends_on",
368 }
369 deployment_info = {
370 "action_id": "123456",
371 "nsr_id": "654321",
372 "task_index": 1,
373 }
374
375 task = Ns._create_task(
376 deployment_info=deployment_info,
377 target_id="vim_openstack_1",
378 item="test_item",
379 action="CREATE",
380 target_record="test_target_record",
381 target_record_id="test_target_record_id",
382 extra_dict={
383 "params": "test_params",
384 "find_params": "test_find_params",
385 "depends_on": "test_depends_on",
386 },
387 )
388
389 self.assertEqual(deployment_info.get("task_index"), 2)
390 self.assertDictEqual(task, expected_result)
391
392 @patch("osm_ng_ro.ns.time")
393 def test__create_ro_task(self, mock_time: Mock):
394 now = 1637324838.994551
395 mock_time.return_value = now
396 task = {
397 "target_id": "vim_openstack_1",
398 "action_id": "123456",
399 "nsr_id": "654321",
400 "task_id": "123456:1",
401 "status": "SCHEDULED",
402 "action": "CREATE",
403 "item": "test_item",
404 "target_record": "test_target_record",
405 "target_record_id": "test_target_record_id",
406 # values coming from extra_dict
407 "params": "test_params",
408 "find_params": "test_find_params",
409 "depends_on": "test_depends_on",
410 }
411 expected_result = {
412 "_id": "123456:1",
413 "locked_by": None,
414 "locked_at": 0.0,
415 "target_id": "vim_openstack_1",
416 "vim_info": {
417 "created": False,
418 "created_items": None,
419 "vim_id": None,
420 "vim_name": None,
421 "vim_status": None,
422 "vim_details": None,
423 "vim_message": None,
424 "refresh_at": None,
425 },
426 "modified_at": now,
427 "created_at": now,
428 "to_check_at": now,
429 "tasks": [task],
430 }
431
432 ro_task = Ns._create_ro_task(
433 target_id="vim_openstack_1",
434 task=task,
435 )
436
437 self.assertDictEqual(ro_task, expected_result)
438
439 def test__process_image_params_with_empty_target_image(self):
440 expected_result = {
441 "find_params": {},
442 }
443 target_image = {}
444
445 result = Ns._process_image_params(
446 target_image=target_image,
447 indata=None,
448 vim_info=None,
449 target_record_id=None,
450 )
451
452 self.assertDictEqual(expected_result, result)
453
454 def test__process_image_params_with_wrong_target_image(self):
455 expected_result = {
456 "find_params": {},
457 }
458 target_image = {
459 "no_image": "to_see_here",
460 }
461
462 result = Ns._process_image_params(
463 target_image=target_image,
464 indata=None,
465 vim_info=None,
466 target_record_id=None,
467 )
468
469 self.assertDictEqual(expected_result, result)
470
471 def test__process_image_params_with_image(self):
472 expected_result = {
473 "find_params": {
474 "filter_dict": {
475 "name": "cirros",
476 },
477 },
478 }
479 target_image = {
480 "image": "cirros",
481 }
482
483 result = Ns._process_image_params(
484 target_image=target_image,
485 indata=None,
486 vim_info=None,
487 target_record_id=None,
488 )
489
490 self.assertDictEqual(expected_result, result)
491
492 def test__process_image_params_with_vim_image_id(self):
493 expected_result = {
494 "find_params": {
495 "filter_dict": {
496 "id": "123456",
497 },
498 },
499 }
500 target_image = {
501 "vim_image_id": "123456",
502 }
503
504 result = Ns._process_image_params(
505 target_image=target_image,
506 indata=None,
507 vim_info=None,
508 target_record_id=None,
509 )
510
511 self.assertDictEqual(expected_result, result)
512
513 def test__process_image_params_with_image_checksum(self):
514 expected_result = {
515 "find_params": {
516 "filter_dict": {
517 "checksum": "e3fc50a88d0a364313df4b21ef20c29e",
518 },
519 },
520 }
521 target_image = {
522 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e",
523 }
524
525 result = Ns._process_image_params(
526 target_image=target_image,
527 indata=None,
528 vim_info=None,
529 target_record_id=None,
530 )
531
532 self.assertDictEqual(expected_result, result)
533
534 def test__get_resource_allocation_params_with_empty_target_image(self):
535 expected_result = {}
536 quota_descriptor = {}
537
538 result = Ns._get_resource_allocation_params(
539 quota_descriptor=quota_descriptor,
540 )
541
542 self.assertDictEqual(expected_result, result)
543
544 def test__get_resource_allocation_params_with_wrong_target_image(self):
545 expected_result = {}
546 quota_descriptor = {
547 "no_quota": "present_here",
548 }
549
550 result = Ns._get_resource_allocation_params(
551 quota_descriptor=quota_descriptor,
552 )
553
554 self.assertDictEqual(expected_result, result)
555
556 def test__get_resource_allocation_params_with_limit(self):
557 expected_result = {
558 "limit": 10,
559 }
560 quota_descriptor = {
561 "limit": "10",
562 }
563
564 result = Ns._get_resource_allocation_params(
565 quota_descriptor=quota_descriptor,
566 )
567
568 self.assertDictEqual(expected_result, result)
569
570 def test__get_resource_allocation_params_with_reserve(self):
571 expected_result = {
572 "reserve": 20,
573 }
574 quota_descriptor = {
575 "reserve": "20",
576 }
577
578 result = Ns._get_resource_allocation_params(
579 quota_descriptor=quota_descriptor,
580 )
581
582 self.assertDictEqual(expected_result, result)
583
584 def test__get_resource_allocation_params_with_shares(self):
585 expected_result = {
586 "shares": 30,
587 }
588 quota_descriptor = {
589 "shares": "30",
590 }
591
592 result = Ns._get_resource_allocation_params(
593 quota_descriptor=quota_descriptor,
594 )
595
596 self.assertDictEqual(expected_result, result)
597
598 def test__get_resource_allocation_params(self):
599 expected_result = {
600 "limit": 10,
601 "reserve": 20,
602 "shares": 30,
603 }
604 quota_descriptor = {
605 "limit": "10",
606 "reserve": "20",
607 "shares": "30",
608 }
609
610 result = Ns._get_resource_allocation_params(
611 quota_descriptor=quota_descriptor,
612 )
613
614 self.assertDictEqual(expected_result, result)
615
616 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
617 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu(
618 self,
619 resource_allocation,
620 ):
621 expected_result = {}
622 guest_epa_quota = {}
623 epa_vcpu_set = True
624
625 result = Ns._process_guest_epa_quota_params(
626 guest_epa_quota=guest_epa_quota,
627 epa_vcpu_set=epa_vcpu_set,
628 )
629
630 self.assertDictEqual(expected_result, result)
631 self.assertFalse(resource_allocation.called)
632
633 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
634 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu(
635 self,
636 resource_allocation,
637 ):
638 expected_result = {}
639 guest_epa_quota = {}
640 epa_vcpu_set = False
641
642 result = Ns._process_guest_epa_quota_params(
643 guest_epa_quota=guest_epa_quota,
644 epa_vcpu_set=epa_vcpu_set,
645 )
646
647 self.assertDictEqual(expected_result, result)
648 self.assertFalse(resource_allocation.called)
649
650 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
651 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu(
652 self,
653 resource_allocation,
654 ):
655 expected_result = {}
656 guest_epa_quota = {
657 "no-quota": "nothing",
658 }
659 epa_vcpu_set = True
660
661 result = Ns._process_guest_epa_quota_params(
662 guest_epa_quota=guest_epa_quota,
663 epa_vcpu_set=epa_vcpu_set,
664 )
665
666 self.assertDictEqual(expected_result, result)
667 self.assertFalse(resource_allocation.called)
668
669 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
670 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu(
671 self,
672 resource_allocation,
673 ):
674 expected_result = {}
675 guest_epa_quota = {
676 "no-quota": "nothing",
677 }
678 epa_vcpu_set = False
679
680 result = Ns._process_guest_epa_quota_params(
681 guest_epa_quota=guest_epa_quota,
682 epa_vcpu_set=epa_vcpu_set,
683 )
684
685 self.assertDictEqual(expected_result, result)
686 self.assertFalse(resource_allocation.called)
687
688 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
689 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu(
690 self,
691 resource_allocation,
692 ):
693 expected_result = {}
694 guest_epa_quota = {
695 "cpu-quota": {
696 "limit": "10",
697 "reserve": "20",
698 "shares": "30",
699 },
700 }
701 epa_vcpu_set = True
702
703 result = Ns._process_guest_epa_quota_params(
704 guest_epa_quota=guest_epa_quota,
705 epa_vcpu_set=epa_vcpu_set,
706 )
707
708 self.assertDictEqual(expected_result, result)
709 self.assertFalse(resource_allocation.called)
710
711 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
712 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu(
713 self,
714 resource_allocation,
715 ):
716 expected_result = {
717 "cpu-quota": {
718 "limit": 10,
719 "reserve": 20,
720 "shares": 30,
721 },
722 }
723 guest_epa_quota = {
724 "cpu-quota": {
725 "limit": "10",
726 "reserve": "20",
727 "shares": "30",
728 },
729 }
730 epa_vcpu_set = False
731
732 resource_allocation_param = {
733 "limit": "10",
734 "reserve": "20",
735 "shares": "30",
736 }
737 resource_allocation.return_value = {
738 "limit": 10,
739 "reserve": 20,
740 "shares": 30,
741 }
742
743 result = Ns._process_guest_epa_quota_params(
744 guest_epa_quota=guest_epa_quota,
745 epa_vcpu_set=epa_vcpu_set,
746 )
747
748 resource_allocation.assert_called_once_with(resource_allocation_param)
749 self.assertDictEqual(expected_result, result)
750
751 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
752 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu(
753 self,
754 resource_allocation,
755 ):
756 expected_result = {
757 "mem-quota": {
758 "limit": 10,
759 "reserve": 20,
760 "shares": 30,
761 },
762 }
763 guest_epa_quota = {
764 "mem-quota": {
765 "limit": "10",
766 "reserve": "20",
767 "shares": "30",
768 },
769 }
770 epa_vcpu_set = True
771
772 resource_allocation_param = {
773 "limit": "10",
774 "reserve": "20",
775 "shares": "30",
776 }
777 resource_allocation.return_value = {
778 "limit": 10,
779 "reserve": 20,
780 "shares": 30,
781 }
782
783 result = Ns._process_guest_epa_quota_params(
784 guest_epa_quota=guest_epa_quota,
785 epa_vcpu_set=epa_vcpu_set,
786 )
787
788 resource_allocation.assert_called_once_with(resource_allocation_param)
789 self.assertDictEqual(expected_result, result)
790
791 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
792 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu(
793 self,
794 resource_allocation,
795 ):
796 expected_result = {
797 "mem-quota": {
798 "limit": 10,
799 "reserve": 20,
800 "shares": 30,
801 },
802 }
803 guest_epa_quota = {
804 "mem-quota": {
805 "limit": "10",
806 "reserve": "20",
807 "shares": "30",
808 },
809 }
810 epa_vcpu_set = False
811
812 resource_allocation_param = {
813 "limit": "10",
814 "reserve": "20",
815 "shares": "30",
816 }
817 resource_allocation.return_value = {
818 "limit": 10,
819 "reserve": 20,
820 "shares": 30,
821 }
822
823 result = Ns._process_guest_epa_quota_params(
824 guest_epa_quota=guest_epa_quota,
825 epa_vcpu_set=epa_vcpu_set,
826 )
827
828 resource_allocation.assert_called_once_with(resource_allocation_param)
829 self.assertDictEqual(expected_result, result)
830
831 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
832 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu(
833 self,
834 resource_allocation,
835 ):
836 expected_result = {
837 "disk-io-quota": {
838 "limit": 10,
839 "reserve": 20,
840 "shares": 30,
841 },
842 }
843 guest_epa_quota = {
844 "disk-io-quota": {
845 "limit": "10",
846 "reserve": "20",
847 "shares": "30",
848 },
849 }
850 epa_vcpu_set = True
851
852 resource_allocation_param = {
853 "limit": "10",
854 "reserve": "20",
855 "shares": "30",
856 }
857 resource_allocation.return_value = {
858 "limit": 10,
859 "reserve": 20,
860 "shares": 30,
861 }
862
863 result = Ns._process_guest_epa_quota_params(
864 guest_epa_quota=guest_epa_quota,
865 epa_vcpu_set=epa_vcpu_set,
866 )
867
868 resource_allocation.assert_called_once_with(resource_allocation_param)
869 self.assertDictEqual(expected_result, result)
870
871 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
872 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu(
873 self,
874 resource_allocation,
875 ):
876 expected_result = {
877 "disk-io-quota": {
878 "limit": 10,
879 "reserve": 20,
880 "shares": 30,
881 },
882 }
883 guest_epa_quota = {
884 "disk-io-quota": {
885 "limit": "10",
886 "reserve": "20",
887 "shares": "30",
888 },
889 }
890 epa_vcpu_set = False
891
892 resource_allocation_param = {
893 "limit": "10",
894 "reserve": "20",
895 "shares": "30",
896 }
897 resource_allocation.return_value = {
898 "limit": 10,
899 "reserve": 20,
900 "shares": 30,
901 }
902
903 result = Ns._process_guest_epa_quota_params(
904 guest_epa_quota=guest_epa_quota,
905 epa_vcpu_set=epa_vcpu_set,
906 )
907
908 resource_allocation.assert_called_once_with(resource_allocation_param)
909 self.assertDictEqual(expected_result, result)
910
911 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
912 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu(
913 self,
914 resource_allocation,
915 ):
916 expected_result = {
917 "vif-quota": {
918 "limit": 10,
919 "reserve": 20,
920 "shares": 30,
921 },
922 }
923 guest_epa_quota = {
924 "vif-quota": {
925 "limit": "10",
926 "reserve": "20",
927 "shares": "30",
928 },
929 }
930 epa_vcpu_set = True
931
932 resource_allocation_param = {
933 "limit": "10",
934 "reserve": "20",
935 "shares": "30",
936 }
937 resource_allocation.return_value = {
938 "limit": 10,
939 "reserve": 20,
940 "shares": 30,
941 }
942
943 result = Ns._process_guest_epa_quota_params(
944 guest_epa_quota=guest_epa_quota,
945 epa_vcpu_set=epa_vcpu_set,
946 )
947
948 resource_allocation.assert_called_once_with(resource_allocation_param)
949 self.assertDictEqual(expected_result, result)
950
951 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
952 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu(
953 self,
954 resource_allocation,
955 ):
956 expected_result = {
957 "vif-quota": {
958 "limit": 10,
959 "reserve": 20,
960 "shares": 30,
961 },
962 }
963 guest_epa_quota = {
964 "vif-quota": {
965 "limit": "10",
966 "reserve": "20",
967 "shares": "30",
968 },
969 }
970 epa_vcpu_set = False
971
972 resource_allocation_param = {
973 "limit": "10",
974 "reserve": "20",
975 "shares": "30",
976 }
977 resource_allocation.return_value = {
978 "limit": 10,
979 "reserve": 20,
980 "shares": 30,
981 }
982
983 result = Ns._process_guest_epa_quota_params(
984 guest_epa_quota=guest_epa_quota,
985 epa_vcpu_set=epa_vcpu_set,
986 )
987
988 resource_allocation.assert_called_once_with(resource_allocation_param)
989 self.assertDictEqual(expected_result, result)
990
991 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
992 def test__process_guest_epa_quota_params_with_quota_epa_cpu(
993 self,
994 resource_allocation,
995 ):
996 expected_result = {
997 "mem-quota": {
998 "limit": 10,
999 "reserve": 20,
1000 "shares": 30,
1001 },
1002 "disk-io-quota": {
1003 "limit": 10,
1004 "reserve": 20,
1005 "shares": 30,
1006 },
1007 "vif-quota": {
1008 "limit": 10,
1009 "reserve": 20,
1010 "shares": 30,
1011 },
1012 }
1013 guest_epa_quota = {
1014 "cpu-quota": {
1015 "limit": "10",
1016 "reserve": "20",
1017 "shares": "30",
1018 },
1019 "mem-quota": {
1020 "limit": "10",
1021 "reserve": "20",
1022 "shares": "30",
1023 },
1024 "disk-io-quota": {
1025 "limit": "10",
1026 "reserve": "20",
1027 "shares": "30",
1028 },
1029 "vif-quota": {
1030 "limit": "10",
1031 "reserve": "20",
1032 "shares": "30",
1033 },
1034 }
1035 epa_vcpu_set = True
1036
1037 resource_allocation.return_value = {
1038 "limit": 10,
1039 "reserve": 20,
1040 "shares": 30,
1041 }
1042
1043 result = Ns._process_guest_epa_quota_params(
1044 guest_epa_quota=guest_epa_quota,
1045 epa_vcpu_set=epa_vcpu_set,
1046 )
1047
1048 self.assertTrue(resource_allocation.called)
1049 self.assertDictEqual(expected_result, result)
1050
1051 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params")
1052 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set(
1053 self,
1054 resource_allocation,
1055 ):
1056 expected_result = {
1057 "cpu-quota": {
1058 "limit": 10,
1059 "reserve": 20,
1060 "shares": 30,
1061 },
1062 "mem-quota": {
1063 "limit": 10,
1064 "reserve": 20,
1065 "shares": 30,
1066 },
1067 "disk-io-quota": {
1068 "limit": 10,
1069 "reserve": 20,
1070 "shares": 30,
1071 },
1072 "vif-quota": {
1073 "limit": 10,
1074 "reserve": 20,
1075 "shares": 30,
1076 },
1077 }
1078 guest_epa_quota = {
1079 "cpu-quota": {
1080 "limit": "10",
1081 "reserve": "20",
1082 "shares": "30",
1083 },
1084 "mem-quota": {
1085 "limit": "10",
1086 "reserve": "20",
1087 "shares": "30",
1088 },
1089 "disk-io-quota": {
1090 "limit": "10",
1091 "reserve": "20",
1092 "shares": "30",
1093 },
1094 "vif-quota": {
1095 "limit": "10",
1096 "reserve": "20",
1097 "shares": "30",
1098 },
1099 }
1100 epa_vcpu_set = False
1101
1102 resource_allocation.return_value = {
1103 "limit": 10,
1104 "reserve": 20,
1105 "shares": 30,
1106 }
1107
1108 result = Ns._process_guest_epa_quota_params(
1109 guest_epa_quota=guest_epa_quota,
1110 epa_vcpu_set=epa_vcpu_set,
1111 )
1112
1113 self.assertTrue(resource_allocation.called)
1114 self.assertDictEqual(expected_result, result)
1115
1116 def test__process_guest_epa_numa_params_with_empty_numa_params(self):
1117 expected_numa_result = []
1118 expected_epa_vcpu_set_result = False
1119 guest_epa_quota = {}
1120
1121 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1122 guest_epa_quota=guest_epa_quota,
1123 )
1124 self.assertEqual(expected_numa_result, numa_result)
1125 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1126
1127 def test__process_guest_epa_numa_params_with_wrong_numa_params(self):
1128 expected_numa_result = []
1129 expected_epa_vcpu_set_result = False
1130 guest_epa_quota = {"no_nume": "here"}
1131
1132 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1133 guest_epa_quota=guest_epa_quota,
1134 )
1135
1136 self.assertEqual(expected_numa_result, numa_result)
1137 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1138
1139 def test__process_guest_epa_numa_params_with_numa_node_policy(self):
1140 expected_numa_result = []
1141 expected_epa_vcpu_set_result = False
1142 guest_epa_quota = {"numa-node-policy": {}}
1143
1144 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1145 guest_epa_quota=guest_epa_quota,
1146 )
1147
1148 self.assertEqual(expected_numa_result, numa_result)
1149 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1150
1151 def test__process_guest_epa_numa_params_with_no_node(self):
1152 expected_numa_result = []
1153 expected_epa_vcpu_set_result = False
1154 guest_epa_quota = {
1155 "numa-node-policy": {
1156 "node": [],
1157 },
1158 }
1159
1160 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1161 guest_epa_quota=guest_epa_quota,
1162 )
1163
1164 self.assertEqual(expected_numa_result, numa_result)
1165 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1166
1167 def test__process_guest_epa_numa_params_with_1_node_num_cores(self):
1168 expected_numa_result = [{"cores": 3}]
1169 expected_epa_vcpu_set_result = True
1170 guest_epa_quota = {
1171 "numa-node-policy": {
1172 "node": [
1173 {
1174 "num-cores": 3,
1175 },
1176 ],
1177 },
1178 }
1179
1180 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1181 guest_epa_quota=guest_epa_quota,
1182 )
1183
1184 self.assertEqual(expected_numa_result, numa_result)
1185 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1186
1187 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self):
1188 expected_numa_result = [{"paired_threads": 3}]
1189 expected_epa_vcpu_set_result = True
1190 guest_epa_quota = {
1191 "numa-node-policy": {
1192 "node": [
1193 {
1194 "paired-threads": {"num-paired-threads": "3"},
1195 },
1196 ],
1197 },
1198 }
1199
1200 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1201 guest_epa_quota=guest_epa_quota,
1202 )
1203
1204 self.assertEqual(expected_numa_result, numa_result)
1205 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1206
1207 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self):
1208 expected_numa_result = [
1209 {
1210 "paired-threads-id": [("0", "1"), ("4", "5")],
1211 }
1212 ]
1213 expected_epa_vcpu_set_result = False
1214 guest_epa_quota = {
1215 "numa-node-policy": {
1216 "node": [
1217 {
1218 "paired-threads": {
1219 "paired-thread-ids": [
1220 {
1221 "thread-a": 0,
1222 "thread-b": 1,
1223 },
1224 {
1225 "thread-a": 4,
1226 "thread-b": 5,
1227 },
1228 ],
1229 },
1230 },
1231 ],
1232 },
1233 }
1234
1235 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1236 guest_epa_quota=guest_epa_quota,
1237 )
1238
1239 self.assertEqual(expected_numa_result, numa_result)
1240 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1241
1242 def test__process_guest_epa_numa_params_with_1_node_num_threads(self):
1243 expected_numa_result = [{"threads": 3}]
1244 expected_epa_vcpu_set_result = True
1245 guest_epa_quota = {
1246 "numa-node-policy": {
1247 "node": [
1248 {
1249 "num-threads": "3",
1250 },
1251 ],
1252 },
1253 }
1254
1255 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1256 guest_epa_quota=guest_epa_quota,
1257 )
1258
1259 self.assertEqual(expected_numa_result, numa_result)
1260 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1261
1262 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self):
1263 expected_numa_result = [{"memory": 2}]
1264 expected_epa_vcpu_set_result = False
1265 guest_epa_quota = {
1266 "numa-node-policy": {
1267 "node": [
1268 {
1269 "memory-mb": 2048,
1270 },
1271 ],
1272 },
1273 }
1274
1275 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1276 guest_epa_quota=guest_epa_quota,
1277 )
1278
1279 self.assertEqual(expected_numa_result, numa_result)
1280 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1281
1282 def test__process_guest_epa_numa_params_with_1_node_vcpu(self):
1283 expected_numa_result = [
1284 {
1285 "id": 0,
1286 "vcpu": [0, 1],
1287 }
1288 ]
1289 expected_epa_vcpu_set_result = False
1290 guest_epa_quota = {
1291 "numa-node-policy": {
1292 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}],
1293 },
1294 }
1295
1296 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1297 guest_epa_quota=guest_epa_quota,
1298 )
1299
1300 self.assertEqual(expected_numa_result, numa_result)
1301 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1302
1303 def test__process_guest_epa_numa_params_with_2_node_vcpu(self):
1304 expected_numa_result = [
1305 {
1306 "id": 0,
1307 "vcpu": [0, 1],
1308 },
1309 {
1310 "id": 1,
1311 "vcpu": [2, 3],
1312 },
1313 ]
1314
1315 expected_epa_vcpu_set_result = False
1316 guest_epa_quota = {
1317 "numa-node-policy": {
1318 "node": [
1319 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]},
1320 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]},
1321 ],
1322 },
1323 }
1324
1325 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1326 guest_epa_quota=guest_epa_quota,
1327 )
1328
1329 self.assertEqual(expected_numa_result, numa_result)
1330 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1331
1332 def test__process_guest_epa_numa_params_with_1_node(self):
1333 expected_numa_result = [
1334 {
1335 # "id": 0,
1336 # "vcpu": [0, 1],
1337 "cores": 3,
1338 "paired_threads": 3,
1339 "paired-threads-id": [("0", "1"), ("4", "5")],
1340 "threads": 3,
1341 "memory": 2,
1342 }
1343 ]
1344 expected_epa_vcpu_set_result = True
1345 guest_epa_quota = {
1346 "numa-node-policy": {
1347 "node": [
1348 {
1349 "num-cores": 3,
1350 "paired-threads": {
1351 "num-paired-threads": "3",
1352 "paired-thread-ids": [
1353 {
1354 "thread-a": 0,
1355 "thread-b": 1,
1356 },
1357 {
1358 "thread-a": 4,
1359 "thread-b": 5,
1360 },
1361 ],
1362 },
1363 "num-threads": "3",
1364 "memory-mb": 2048,
1365 },
1366 ],
1367 },
1368 }
1369
1370 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1371 guest_epa_quota=guest_epa_quota,
1372 )
1373
1374 self.assertEqual(expected_numa_result, numa_result)
1375 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1376
1377 def test__process_guest_epa_numa_params_with_2_nodes(self):
1378 expected_numa_result = [
1379 {
1380 "cores": 3,
1381 "paired_threads": 3,
1382 "paired-threads-id": [("0", "1"), ("4", "5")],
1383 "threads": 3,
1384 "memory": 2,
1385 },
1386 {
1387 "cores": 7,
1388 "paired_threads": 7,
1389 "paired-threads-id": [("2", "3"), ("5", "6")],
1390 "threads": 4,
1391 "memory": 4,
1392 },
1393 ]
1394 expected_epa_vcpu_set_result = True
1395 guest_epa_quota = {
1396 "numa-node-policy": {
1397 "node": [
1398 {
1399 "num-cores": 3,
1400 "paired-threads": {
1401 "num-paired-threads": "3",
1402 "paired-thread-ids": [
1403 {
1404 "thread-a": 0,
1405 "thread-b": 1,
1406 },
1407 {
1408 "thread-a": 4,
1409 "thread-b": 5,
1410 },
1411 ],
1412 },
1413 "num-threads": "3",
1414 "memory-mb": 2048,
1415 },
1416 {
1417 "num-cores": 7,
1418 "paired-threads": {
1419 "num-paired-threads": "7",
1420 "paired-thread-ids": [
1421 {
1422 "thread-a": 2,
1423 "thread-b": 3,
1424 },
1425 {
1426 "thread-a": 5,
1427 "thread-b": 6,
1428 },
1429 ],
1430 },
1431 "num-threads": "4",
1432 "memory-mb": 4096,
1433 },
1434 ],
1435 },
1436 }
1437
1438 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params(
1439 guest_epa_quota=guest_epa_quota,
1440 )
1441
1442 self.assertEqual(expected_numa_result, numa_result)
1443 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1444
1445 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self):
1446 expected_numa_result = {}
1447 expected_epa_vcpu_set_result = False
1448 guest_epa_quota = {}
1449 vcpu_count = 0
1450 epa_vcpu_set = False
1451
1452 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1453 guest_epa_quota=guest_epa_quota,
1454 vcpu_count=vcpu_count,
1455 epa_vcpu_set=epa_vcpu_set,
1456 )
1457
1458 self.assertDictEqual(expected_numa_result, numa_result)
1459 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1460
1461 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self):
1462 expected_numa_result = {}
1463 expected_epa_vcpu_set_result = False
1464 guest_epa_quota = {
1465 "no-cpu-pinning-policy": "here",
1466 }
1467 vcpu_count = 0
1468 epa_vcpu_set = False
1469
1470 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1471 guest_epa_quota=guest_epa_quota,
1472 vcpu_count=vcpu_count,
1473 epa_vcpu_set=epa_vcpu_set,
1474 )
1475
1476 self.assertDictEqual(expected_numa_result, numa_result)
1477 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1478
1479 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self):
1480 expected_numa_result = {}
1481 expected_epa_vcpu_set_result = True
1482 guest_epa_quota = {
1483 "cpu-pinning-policy": "DEDICATED",
1484 }
1485 vcpu_count = 0
1486 epa_vcpu_set = True
1487
1488 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1489 guest_epa_quota=guest_epa_quota,
1490 vcpu_count=vcpu_count,
1491 epa_vcpu_set=epa_vcpu_set,
1492 )
1493
1494 self.assertDictEqual(expected_numa_result, numa_result)
1495 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1496
1497 def test__process_guest_epa_cpu_pinning_params_with_threads(self):
1498 expected_numa_result = {"threads": 3}
1499 expected_epa_vcpu_set_result = True
1500 guest_epa_quota = {
1501 "cpu-pinning-policy": "DEDICATED",
1502 "cpu-thread-pinning-policy": "PREFER",
1503 }
1504 vcpu_count = 3
1505 epa_vcpu_set = False
1506
1507 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1508 guest_epa_quota=guest_epa_quota,
1509 vcpu_count=vcpu_count,
1510 epa_vcpu_set=epa_vcpu_set,
1511 )
1512
1513 self.assertDictEqual(expected_numa_result, numa_result)
1514 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1515
1516 def test__process_guest_epa_cpu_pinning_params(self):
1517 expected_numa_result = {"cores": 3}
1518 expected_epa_vcpu_set_result = True
1519 guest_epa_quota = {
1520 "cpu-pinning-policy": "DEDICATED",
1521 }
1522 vcpu_count = 3
1523 epa_vcpu_set = False
1524
1525 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params(
1526 guest_epa_quota=guest_epa_quota,
1527 vcpu_count=vcpu_count,
1528 epa_vcpu_set=epa_vcpu_set,
1529 )
1530
1531 self.assertDictEqual(expected_numa_result, numa_result)
1532 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result)
1533
1534 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1535 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1536 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1537 def test__process_guest_epa_params_with_empty_params(
1538 self,
1539 guest_epa_numa_params,
1540 guest_epa_cpu_pinning_params,
1541 guest_epa_quota_params,
1542 ):
1543 expected_result = {}
1544 target_flavor = {}
1545
1546 result = Ns._process_epa_params(
1547 target_flavor=target_flavor,
1548 )
1549
1550 self.assertDictEqual(expected_result, result)
1551 self.assertFalse(guest_epa_numa_params.called)
1552 self.assertFalse(guest_epa_cpu_pinning_params.called)
1553 self.assertFalse(guest_epa_quota_params.called)
1554
1555 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1556 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1557 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1558 def test__process_guest_epa_params_with_wrong_params(
1559 self,
1560 guest_epa_numa_params,
1561 guest_epa_cpu_pinning_params,
1562 guest_epa_quota_params,
1563 ):
1564 expected_result = {}
1565 target_flavor = {
1566 "no-guest-epa": "here",
1567 }
1568
1569 result = Ns._process_epa_params(
1570 target_flavor=target_flavor,
1571 )
1572
1573 self.assertDictEqual(expected_result, result)
1574 self.assertFalse(guest_epa_numa_params.called)
1575 self.assertFalse(guest_epa_cpu_pinning_params.called)
1576 self.assertFalse(guest_epa_quota_params.called)
1577
1578 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1579 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1580 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1581 def test__process_guest_epa_params(
1582 self,
1583 guest_epa_numa_params,
1584 guest_epa_cpu_pinning_params,
1585 guest_epa_quota_params,
1586 ):
1587 expected_result = {
1588 "mem-policy": "STRICT",
1589 }
1590 target_flavor = {
1591 "guest-epa": {
1592 "vcpu-count": 1,
1593 "numa-node-policy": {
1594 "mem-policy": "STRICT",
1595 },
1596 },
1597 }
1598
1599 guest_epa_numa_params.return_value = ({}, False)
1600 guest_epa_cpu_pinning_params.return_value = ({}, False)
1601 guest_epa_quota_params.return_value = {}
1602
1603 result = Ns._process_epa_params(
1604 target_flavor=target_flavor,
1605 )
1606
1607 self.assertDictEqual(expected_result, result)
1608 self.assertTrue(guest_epa_numa_params.called)
1609 self.assertTrue(guest_epa_cpu_pinning_params.called)
1610 self.assertTrue(guest_epa_quota_params.called)
1611
1612 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1613 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1614 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1615 def test__process_guest_epa_params_with_mempage_size(
1616 self,
1617 guest_epa_numa_params,
1618 guest_epa_cpu_pinning_params,
1619 guest_epa_quota_params,
1620 ):
1621 expected_result = {
1622 "mempage-size": "1G",
1623 "mem-policy": "STRICT",
1624 }
1625 target_flavor = {
1626 "guest-epa": {
1627 "vcpu-count": 1,
1628 "mempage-size": "1G",
1629 "numa-node-policy": {
1630 "mem-policy": "STRICT",
1631 },
1632 },
1633 }
1634
1635 guest_epa_numa_params.return_value = ({}, False)
1636 guest_epa_cpu_pinning_params.return_value = ({}, False)
1637 guest_epa_quota_params.return_value = {}
1638
1639 result = Ns._process_epa_params(
1640 target_flavor=target_flavor,
1641 )
1642
1643 self.assertDictEqual(expected_result, result)
1644 self.assertTrue(guest_epa_numa_params.called)
1645 self.assertTrue(guest_epa_cpu_pinning_params.called)
1646 self.assertTrue(guest_epa_quota_params.called)
1647
1648 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params")
1649 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params")
1650 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params")
1651 def test__process_guest_epa_params_with_numa(
1652 self,
1653 guest_epa_numa_params,
1654 guest_epa_cpu_pinning_params,
1655 guest_epa_quota_params,
1656 ):
1657 expected_result = {
1658 "mempage-size": "1G",
1659 "cpu-pinning-policy": "DEDICATED",
1660 "cpu-thread-pinning-policy": "PREFER",
1661 "numas": [
1662 {
1663 "cores": 3,
1664 "memory": 2,
1665 "paired-threads": 3,
1666 "paired-threads-id": [("0", "1"), ("4", "5")],
1667 "threads": 3,
1668 }
1669 ],
1670 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30},
1671 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30},
1672 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30},
1673 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30},
1674 }
1675 target_flavor = {
1676 "guest-epa": {
1677 "vcpu-count": 1,
1678 "mempage-size": "1G",
1679 "cpu-pinning-policy": "DEDICATED",
1680 "cpu-thread-pinning-policy": "PREFER",
1681 "numa-node-policy": {
1682 "node": [
1683 {
1684 "num-cores": 3,
1685 "paired-threads": {
1686 "num-paired-threads": "3",
1687 "paired-thread-ids": [
1688 {
1689 "thread-a": 0,
1690 "thread-b": 1,
1691 },
1692 {
1693 "thread-a": 4,
1694 "thread-b": 5,
1695 },
1696 ],
1697 },
1698 "num-threads": "3",
1699 "memory-mb": 2048,
1700 },
1701 ],
1702 },
1703 "cpu-quota": {
1704 "limit": "10",
1705 "reserve": "20",
1706 "shares": "30",
1707 },
1708 "mem-quota": {
1709 "limit": "10",
1710 "reserve": "20",
1711 "shares": "30",
1712 },
1713 "disk-io-quota": {
1714 "limit": "10",
1715 "reserve": "20",
1716 "shares": "30",
1717 },
1718 "vif-quota": {
1719 "limit": "10",
1720 "reserve": "20",
1721 "shares": "30",
1722 },
1723 },
1724 }
1725
1726 guest_epa_numa_params.return_value = (
1727 [
1728 {
1729 "cores": 3,
1730 "paired-threads": 3,
1731 "paired-threads-id": [("0", "1"), ("4", "5")],
1732 "threads": 3,
1733 "memory": 2,
1734 },
1735 ],
1736 True,
1737 )
1738 guest_epa_cpu_pinning_params.return_value = (
1739 {
1740 "threads": 3,
1741 },
1742 True,
1743 )
1744 guest_epa_quota_params.return_value = {
1745 "cpu-quota": {
1746 "limit": 10,
1747 "reserve": 20,
1748 "shares": 30,
1749 },
1750 "mem-quota": {
1751 "limit": 10,
1752 "reserve": 20,
1753 "shares": 30,
1754 },
1755 "disk-io-quota": {
1756 "limit": 10,
1757 "reserve": 20,
1758 "shares": 30,
1759 },
1760 "vif-quota": {
1761 "limit": 10,
1762 "reserve": 20,
1763 "shares": 30,
1764 },
1765 }
1766
1767 result = Ns._process_epa_params(
1768 target_flavor=target_flavor,
1769 )
1770 self.assertEqual(expected_result, result)
1771 self.assertTrue(guest_epa_numa_params.called)
1772 self.assertTrue(guest_epa_cpu_pinning_params.called)
1773 self.assertTrue(guest_epa_quota_params.called)
1774
1775 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1776 def test__process_flavor_params_with_empty_target_flavor(
1777 self,
1778 epa_params,
1779 ):
1780 target_flavor = {}
1781 indata = {
1782 "vnf": [
1783 {
1784 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1785 },
1786 ],
1787 }
1788 vim_info = {}
1789 target_record_id = ""
1790
1791 with self.assertRaises(KeyError):
1792 Ns._process_flavor_params(
1793 target_flavor=target_flavor,
1794 indata=indata,
1795 vim_info=vim_info,
1796 target_record_id=target_record_id,
1797 )
1798
1799 self.assertFalse(epa_params.called)
1800
1801 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1802 def test__process_flavor_params_with_wrong_target_flavor(
1803 self,
1804 epa_params,
1805 ):
1806 target_flavor = {
1807 "no-target-flavor": "here",
1808 }
1809 indata = {}
1810 vim_info = {}
1811 target_record_id = ""
1812
1813 with self.assertRaises(KeyError):
1814 Ns._process_flavor_params(
1815 target_flavor=target_flavor,
1816 indata=indata,
1817 vim_info=vim_info,
1818 target_record_id=target_record_id,
1819 )
1820
1821 self.assertFalse(epa_params.called)
1822
1823 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1824 def test__process_flavor_params_with_empty_indata(
1825 self,
1826 epa_params,
1827 ):
1828 expected_result = {
1829 "find_params": {
1830 "flavor_data": {
1831 "disk": 10,
1832 "ram": 1024,
1833 "vcpus": 2,
1834 },
1835 },
1836 "params": {
1837 "flavor_data": {
1838 "disk": 10,
1839 "name": "test",
1840 "ram": 1024,
1841 "vcpus": 2,
1842 },
1843 },
1844 }
1845 target_flavor = {
1846 "name": "test",
1847 "storage-gb": "10",
1848 "memory-mb": "1024",
1849 "vcpu-count": "2",
1850 }
1851 indata = {}
1852 vim_info = {}
1853 target_record_id = ""
1854
1855 epa_params.return_value = {}
1856
1857 result = Ns._process_flavor_params(
1858 target_flavor=target_flavor,
1859 indata=indata,
1860 vim_info=vim_info,
1861 target_record_id=target_record_id,
1862 )
1863
1864 self.assertTrue(epa_params.called)
1865 self.assertDictEqual(result, expected_result)
1866
1867 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1868 def test__process_flavor_params_with_wrong_indata(
1869 self,
1870 epa_params,
1871 ):
1872 expected_result = {
1873 "find_params": {
1874 "flavor_data": {
1875 "disk": 10,
1876 "ram": 1024,
1877 "vcpus": 2,
1878 },
1879 },
1880 "params": {
1881 "flavor_data": {
1882 "disk": 10,
1883 "name": "test",
1884 "ram": 1024,
1885 "vcpus": 2,
1886 },
1887 },
1888 }
1889 target_flavor = {
1890 "name": "test",
1891 "storage-gb": "10",
1892 "memory-mb": "1024",
1893 "vcpu-count": "2",
1894 }
1895 indata = {
1896 "no-vnf": "here",
1897 }
1898 vim_info = {}
1899 target_record_id = ""
1900
1901 epa_params.return_value = {}
1902
1903 result = Ns._process_flavor_params(
1904 target_flavor=target_flavor,
1905 indata=indata,
1906 vim_info=vim_info,
1907 target_record_id=target_record_id,
1908 )
1909
1910 self.assertTrue(epa_params.called)
1911 self.assertDictEqual(result, expected_result)
1912
1913 @patch("osm_ng_ro.ns.Ns._process_epa_params")
1914 def test__process_flavor_params_with_ephemeral_disk(
1915 self,
1916 epa_params,
1917 ):
1918 kwargs = {
1919 "db": db,
1920 }
1921
1922 db.get_one.return_value = {
1923 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
1924 "df": [
1925 {
1926 "id": "default-df",
1927 "vdu-profile": [
1928 {"id": "without_volumes-VM", "min-number-of-instances": 1}
1929 ],
1930 }
1931 ],
1932 "id": "without_volumes-vnf",
1933 "product-name": "without_volumes-vnf",
1934 "vdu": [
1935 {
1936 "id": "without_volumes-VM",
1937 "name": "without_volumes-VM",
1938 "sw-image-desc": "ubuntu20.04",
1939 "alternative-sw-image-desc": [
1940 "ubuntu20.04-aws",
1941 "ubuntu20.04-azure",
1942 ],
1943 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
1944 }
1945 ],
1946 "version": "1.0",
1947 "virtual-storage-desc": [
1948 {"id": "root-volume", "size-of-storage": "10"},
1949 {
1950 "id": "ephemeral-volume",
1951 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
1952 "size-of-storage": "1",
1953 },
1954 ],
1955 "_admin": {
1956 "storage": {
1957 "fs": "mongo",
1958 "path": "/app/storage/",
1959 },
1960 "type": "vnfd",
1961 },
1962 }
1963 expected_result = {
1964 "find_params": {
1965 "flavor_data": {
1966 "disk": 10,
1967 "ram": 1024,
1968 "vcpus": 2,
1969 "ephemeral": 10,
1970 },
1971 },
1972 "params": {
1973 "flavor_data": {
1974 "disk": 10,
1975 "name": "test",
1976 "ram": 1024,
1977 "vcpus": 2,
1978 "ephemeral": 10,
1979 },
1980 },
1981 }
1982 target_flavor = {
1983 "id": "test_id",
1984 "name": "test",
1985 "storage-gb": "10",
1986 "memory-mb": "1024",
1987 "vcpu-count": "2",
1988 }
1989 indata = {
1990 "vnf": [
1991 {
1992 "vdur": [
1993 {
1994 "ns-flavor-id": "test_id",
1995 "virtual-storages": [
1996 {
1997 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
1998 "size-of-storage": "10",
1999 },
2000 ],
2001 },
2002 ],
2003 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2004 },
2005 ],
2006 }
2007 vim_info = {}
2008 target_record_id = ""
2009
2010 epa_params.return_value = {}
2011
2012 result = Ns._process_flavor_params(
2013 target_flavor=target_flavor,
2014 indata=indata,
2015 vim_info=vim_info,
2016 target_record_id=target_record_id,
2017 **kwargs,
2018 )
2019
2020 self.assertTrue(epa_params.called)
2021 self.assertDictEqual(result, expected_result)
2022
2023 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2024 def test__process_flavor_params_with_swap_disk(
2025 self,
2026 epa_params,
2027 ):
2028 expected_result = {
2029 "find_params": {
2030 "flavor_data": {
2031 "disk": 10,
2032 "ram": 1024,
2033 "vcpus": 2,
2034 "swap": 20,
2035 },
2036 },
2037 "params": {
2038 "flavor_data": {
2039 "disk": 10,
2040 "name": "test",
2041 "ram": 1024,
2042 "vcpus": 2,
2043 "swap": 20,
2044 },
2045 },
2046 }
2047 target_flavor = {
2048 "id": "test_id",
2049 "name": "test",
2050 "storage-gb": "10",
2051 "memory-mb": "1024",
2052 "vcpu-count": "2",
2053 }
2054 indata = {
2055 "vnf": [
2056 {
2057 "vdur": [
2058 {
2059 "ns-flavor-id": "test_id",
2060 "virtual-storages": [
2061 {
2062 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2063 "size-of-storage": "20",
2064 },
2065 ],
2066 },
2067 ],
2068 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2069 },
2070 ],
2071 }
2072 vim_info = {}
2073 target_record_id = ""
2074
2075 epa_params.return_value = {}
2076
2077 result = Ns._process_flavor_params(
2078 target_flavor=target_flavor,
2079 indata=indata,
2080 vim_info=vim_info,
2081 target_record_id=target_record_id,
2082 )
2083
2084 self.assertTrue(epa_params.called)
2085 self.assertDictEqual(result, expected_result)
2086
2087 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2088 def test__process_flavor_params_with_persistent_root_disk(
2089 self,
2090 epa_params,
2091 ):
2092 kwargs = {
2093 "db": db,
2094 }
2095
2096 db.get_one.return_value = {
2097 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2098 "df": [
2099 {
2100 "id": "default-df",
2101 "vdu-profile": [
2102 {"id": "several_volumes-VM", "min-number-of-instances": 1}
2103 ],
2104 }
2105 ],
2106 "id": "several_volumes-vnf",
2107 "product-name": "several_volumes-vnf",
2108 "vdu": [
2109 {
2110 "id": "several_volumes-VM",
2111 "name": "several_volumes-VM",
2112 "sw-image-desc": "ubuntu20.04",
2113 "alternative-sw-image-desc": [
2114 "ubuntu20.04-aws",
2115 "ubuntu20.04-azure",
2116 ],
2117 "virtual-storage-desc": [
2118 "persistent-root-volume",
2119 ],
2120 }
2121 ],
2122 "version": "1.0",
2123 "virtual-storage-desc": [
2124 {
2125 "id": "persistent-root-volume",
2126 "type-of-storage": "persistent-storage:persistent-storage",
2127 "size-of-storage": "10",
2128 },
2129 ],
2130 "_admin": {
2131 "storage": {
2132 "fs": "mongo",
2133 "path": "/app/storage/",
2134 },
2135 "type": "vnfd",
2136 },
2137 }
2138 expected_result = {
2139 "find_params": {
2140 "flavor_data": {
2141 "disk": 0,
2142 "ram": 1024,
2143 "vcpus": 2,
2144 },
2145 },
2146 "params": {
2147 "flavor_data": {
2148 "disk": 0,
2149 "name": "test",
2150 "ram": 1024,
2151 "vcpus": 2,
2152 },
2153 },
2154 }
2155 target_flavor = {
2156 "id": "test_id",
2157 "name": "test",
2158 "storage-gb": "10",
2159 "memory-mb": "1024",
2160 "vcpu-count": "2",
2161 }
2162 indata = {
2163 "vnf": [
2164 {
2165 "vdur": [
2166 {
2167 "vdu-name": "several_volumes-VM",
2168 "ns-flavor-id": "test_id",
2169 "virtual-storages": [
2170 {
2171 "type-of-storage": "persistent-storage:persistent-storage",
2172 "size-of-storage": "10",
2173 },
2174 ],
2175 },
2176 ],
2177 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2178 },
2179 ],
2180 }
2181 vim_info = {}
2182 target_record_id = ""
2183
2184 epa_params.return_value = {}
2185
2186 result = Ns._process_flavor_params(
2187 target_flavor=target_flavor,
2188 indata=indata,
2189 vim_info=vim_info,
2190 target_record_id=target_record_id,
2191 **kwargs,
2192 )
2193
2194 self.assertTrue(epa_params.called)
2195 self.assertDictEqual(result, expected_result)
2196
2197 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2198 def test__process_flavor_params_with_epa_params(
2199 self,
2200 epa_params,
2201 ):
2202 expected_result = {
2203 "find_params": {
2204 "flavor_data": {
2205 "disk": 10,
2206 "ram": 1024,
2207 "vcpus": 2,
2208 "extended": {
2209 "numa": "there-is-numa-here",
2210 },
2211 },
2212 },
2213 "params": {
2214 "flavor_data": {
2215 "disk": 10,
2216 "name": "test",
2217 "ram": 1024,
2218 "vcpus": 2,
2219 "extended": {
2220 "numa": "there-is-numa-here",
2221 },
2222 },
2223 },
2224 }
2225 target_flavor = {
2226 "id": "test_id",
2227 "name": "test",
2228 "storage-gb": "10",
2229 "memory-mb": "1024",
2230 "vcpu-count": "2",
2231 }
2232 indata = {
2233 "vnf": [
2234 {
2235 "vdur": [
2236 {
2237 "ns-flavor-id": "test_id",
2238 },
2239 ],
2240 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2241 },
2242 ],
2243 }
2244 vim_info = {}
2245 target_record_id = ""
2246
2247 epa_params.return_value = {
2248 "numa": "there-is-numa-here",
2249 }
2250
2251 result = Ns._process_flavor_params(
2252 target_flavor=target_flavor,
2253 indata=indata,
2254 vim_info=vim_info,
2255 target_record_id=target_record_id,
2256 )
2257
2258 self.assertTrue(epa_params.called)
2259 self.assertDictEqual(result, expected_result)
2260
2261 @patch("osm_ng_ro.ns.Ns._process_epa_params")
2262 def test__process_flavor_params(
2263 self,
2264 epa_params,
2265 ):
2266 kwargs = {
2267 "db": db,
2268 }
2269
2270 db.get_one.return_value = {
2271 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2272 "df": [
2273 {
2274 "id": "default-df",
2275 "vdu-profile": [
2276 {"id": "without_volumes-VM", "min-number-of-instances": 1}
2277 ],
2278 }
2279 ],
2280 "id": "without_volumes-vnf",
2281 "product-name": "without_volumes-vnf",
2282 "vdu": [
2283 {
2284 "id": "without_volumes-VM",
2285 "name": "without_volumes-VM",
2286 "sw-image-desc": "ubuntu20.04",
2287 "alternative-sw-image-desc": [
2288 "ubuntu20.04-aws",
2289 "ubuntu20.04-azure",
2290 ],
2291 "virtual-storage-desc": ["root-volume", "ephemeral-volume"],
2292 }
2293 ],
2294 "version": "1.0",
2295 "virtual-storage-desc": [
2296 {"id": "root-volume", "size-of-storage": "10"},
2297 {
2298 "id": "ephemeral-volume",
2299 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2300 "size-of-storage": "1",
2301 },
2302 ],
2303 "_admin": {
2304 "storage": {
2305 "fs": "mongo",
2306 "path": "/app/storage/",
2307 },
2308 "type": "vnfd",
2309 },
2310 }
2311
2312 expected_result = {
2313 "find_params": {
2314 "flavor_data": {
2315 "disk": 10,
2316 "ram": 1024,
2317 "vcpus": 2,
2318 "ephemeral": 10,
2319 "swap": 20,
2320 "extended": {
2321 "numa": "there-is-numa-here",
2322 },
2323 },
2324 },
2325 "params": {
2326 "flavor_data": {
2327 "disk": 10,
2328 "name": "test",
2329 "ram": 1024,
2330 "vcpus": 2,
2331 "ephemeral": 10,
2332 "swap": 20,
2333 "extended": {
2334 "numa": "there-is-numa-here",
2335 },
2336 },
2337 },
2338 }
2339 target_flavor = {
2340 "id": "test_id",
2341 "name": "test",
2342 "storage-gb": "10",
2343 "memory-mb": "1024",
2344 "vcpu-count": "2",
2345 }
2346 indata = {
2347 "vnf": [
2348 {
2349 "vdur": [
2350 {
2351 "ns-flavor-id": "test_id",
2352 "virtual-storages": [
2353 {
2354 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
2355 "size-of-storage": "10",
2356 },
2357 {
2358 "type-of-storage": "etsi-nfv-descriptors:swap-storage",
2359 "size-of-storage": "20",
2360 },
2361 ],
2362 },
2363 ],
2364 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18",
2365 },
2366 ],
2367 }
2368 vim_info = {}
2369 target_record_id = ""
2370
2371 epa_params.return_value = {
2372 "numa": "there-is-numa-here",
2373 }
2374
2375 result = Ns._process_flavor_params(
2376 target_flavor=target_flavor,
2377 indata=indata,
2378 vim_info=vim_info,
2379 target_record_id=target_record_id,
2380 **kwargs,
2381 )
2382
2383 self.assertTrue(epa_params.called)
2384 self.assertDictEqual(result, expected_result)
2385
2386 def test__ip_profile_to_ro_with_none(self):
2387 ip_profile = None
2388
2389 result = Ns._ip_profile_to_ro(
2390 ip_profile=ip_profile,
2391 )
2392
2393 self.assertIsNone(result)
2394
2395 def test__ip_profile_to_ro_with_empty_profile(self):
2396 ip_profile = {}
2397
2398 result = Ns._ip_profile_to_ro(
2399 ip_profile=ip_profile,
2400 )
2401
2402 self.assertIsNone(result)
2403
2404 def test__ip_profile_to_ro_with_wrong_profile(self):
2405 ip_profile = {
2406 "no-profile": "here",
2407 }
2408 expected_result = {
2409 "ip_version": "IPv4",
2410 "subnet_address": None,
2411 "gateway_address": None,
2412 "dhcp_enabled": False,
2413 "dhcp_start_address": None,
2414 "dhcp_count": None,
2415 }
2416
2417 result = Ns._ip_profile_to_ro(
2418 ip_profile=ip_profile,
2419 )
2420
2421 self.assertDictEqual(expected_result, result)
2422
2423 def test__ip_profile_to_ro_with_ipv4_profile(self):
2424 ip_profile = {
2425 "ip-version": "ipv4",
2426 "subnet-address": "192.168.0.0/24",
2427 "gateway-address": "192.168.0.254",
2428 "dhcp-params": {
2429 "enabled": True,
2430 "start-address": "192.168.0.10",
2431 "count": 25,
2432 },
2433 }
2434 expected_result = {
2435 "ip_version": "IPv4",
2436 "subnet_address": "192.168.0.0/24",
2437 "gateway_address": "192.168.0.254",
2438 "dhcp_enabled": True,
2439 "dhcp_start_address": "192.168.0.10",
2440 "dhcp_count": 25,
2441 }
2442
2443 result = Ns._ip_profile_to_ro(
2444 ip_profile=ip_profile,
2445 )
2446
2447 self.assertDictEqual(expected_result, result)
2448
2449 def test__ip_profile_to_ro_with_ipv6_profile(self):
2450 ip_profile = {
2451 "ip-version": "ipv6",
2452 "subnet-address": "2001:0200:0001::/48",
2453 "gateway-address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2454 "dhcp-params": {
2455 "enabled": True,
2456 "start-address": "2001:0200:0001::0010",
2457 "count": 25,
2458 },
2459 }
2460 expected_result = {
2461 "ip_version": "IPv6",
2462 "subnet_address": "2001:0200:0001::/48",
2463 "gateway_address": "2001:0200:0001:ffff:ffff:ffff:ffff:fffe",
2464 "dhcp_enabled": True,
2465 "dhcp_start_address": "2001:0200:0001::0010",
2466 "dhcp_count": 25,
2467 }
2468
2469 result = Ns._ip_profile_to_ro(
2470 ip_profile=ip_profile,
2471 )
2472
2473 self.assertDictEqual(expected_result, result)
2474
2475 def test__ip_profile_to_ro_with_dns_server(self):
2476 ip_profile = {
2477 "ip-version": "ipv4",
2478 "subnet-address": "192.168.0.0/24",
2479 "gateway-address": "192.168.0.254",
2480 "dhcp-params": {
2481 "enabled": True,
2482 "start-address": "192.168.0.10",
2483 "count": 25,
2484 },
2485 "dns-server": [
2486 {
2487 "address": "8.8.8.8",
2488 },
2489 {
2490 "address": "1.1.1.1",
2491 },
2492 {
2493 "address": "1.0.0.1",
2494 },
2495 ],
2496 }
2497 expected_result = {
2498 "ip_version": "IPv4",
2499 "subnet_address": "192.168.0.0/24",
2500 "gateway_address": "192.168.0.254",
2501 "dhcp_enabled": True,
2502 "dhcp_start_address": "192.168.0.10",
2503 "dhcp_count": 25,
2504 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2505 }
2506
2507 result = Ns._ip_profile_to_ro(
2508 ip_profile=ip_profile,
2509 )
2510
2511 self.assertDictEqual(expected_result, result)
2512
2513 def test__ip_profile_to_ro_with_security_group(self):
2514 ip_profile = {
2515 "ip-version": "ipv4",
2516 "subnet-address": "192.168.0.0/24",
2517 "gateway-address": "192.168.0.254",
2518 "dhcp-params": {
2519 "enabled": True,
2520 "start-address": "192.168.0.10",
2521 "count": 25,
2522 },
2523 "security-group": {
2524 "some-security-group": "here",
2525 },
2526 }
2527 expected_result = {
2528 "ip_version": "IPv4",
2529 "subnet_address": "192.168.0.0/24",
2530 "gateway_address": "192.168.0.254",
2531 "dhcp_enabled": True,
2532 "dhcp_start_address": "192.168.0.10",
2533 "dhcp_count": 25,
2534 "security_group": {
2535 "some-security-group": "here",
2536 },
2537 }
2538
2539 result = Ns._ip_profile_to_ro(
2540 ip_profile=ip_profile,
2541 )
2542
2543 self.assertDictEqual(expected_result, result)
2544
2545 def test__ip_profile_to_ro(self):
2546 ip_profile = {
2547 "ip-version": "ipv4",
2548 "subnet-address": "192.168.0.0/24",
2549 "gateway-address": "192.168.0.254",
2550 "dhcp-params": {
2551 "enabled": True,
2552 "start-address": "192.168.0.10",
2553 "count": 25,
2554 },
2555 "dns-server": [
2556 {
2557 "address": "8.8.8.8",
2558 },
2559 {
2560 "address": "1.1.1.1",
2561 },
2562 {
2563 "address": "1.0.0.1",
2564 },
2565 ],
2566 "security-group": {
2567 "some-security-group": "here",
2568 },
2569 }
2570 expected_result = {
2571 "ip_version": "IPv4",
2572 "subnet_address": "192.168.0.0/24",
2573 "gateway_address": "192.168.0.254",
2574 "dhcp_enabled": True,
2575 "dhcp_start_address": "192.168.0.10",
2576 "dhcp_count": 25,
2577 "dns_address": "8.8.8.8;1.1.1.1;1.0.0.1",
2578 "security_group": {
2579 "some-security-group": "here",
2580 },
2581 }
2582
2583 result = Ns._ip_profile_to_ro(
2584 ip_profile=ip_profile,
2585 )
2586
2587 self.assertDictEqual(expected_result, result)
2588
2589 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2590 def test__process_net_params_with_empty_params(
2591 self,
2592 ip_profile_to_ro,
2593 ):
2594 target_vld = {
2595 "name": "vld-name",
2596 }
2597 indata = {
2598 "name": "ns-name",
2599 }
2600 vim_info = {
2601 "provider_network": "some-profile-here",
2602 }
2603 target_record_id = ""
2604 expected_result = {
2605 "params": {
2606 "net_name": "ns-name-vld-name",
2607 "net_type": "bridge",
2608 "ip_profile": {
2609 "some_ip_profile": "here",
2610 },
2611 "provider_network_profile": "some-profile-here",
2612 }
2613 }
2614
2615 ip_profile_to_ro.return_value = {
2616 "some_ip_profile": "here",
2617 }
2618
2619 result = Ns._process_net_params(
2620 target_vld=target_vld,
2621 indata=indata,
2622 vim_info=vim_info,
2623 target_record_id=target_record_id,
2624 )
2625
2626 self.assertDictEqual(expected_result, result)
2627 self.assertTrue(ip_profile_to_ro.called)
2628
2629 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2630 def test__process_net_params_with_vim_info_sdn(
2631 self,
2632 ip_profile_to_ro,
2633 ):
2634 target_vld = {
2635 "name": "vld-name",
2636 }
2637 indata = {
2638 "name": "ns-name",
2639 }
2640 vim_info = {
2641 "sdn": "some-sdn",
2642 "sdn-ports": ["some", "ports", "here"],
2643 "vlds": ["some", "vlds", "here"],
2644 "type": "sdn-type",
2645 }
2646 target_record_id = "vld.sdn.something"
2647 expected_result = {
2648 "params": {
2649 "sdn-ports": ["some", "ports", "here"],
2650 "vlds": ["some", "vlds", "here"],
2651 "type": "sdn-type",
2652 }
2653 }
2654
2655 result = Ns._process_net_params(
2656 target_vld=target_vld,
2657 indata=indata,
2658 vim_info=vim_info,
2659 target_record_id=target_record_id,
2660 )
2661
2662 self.assertDictEqual(expected_result, result)
2663 self.assertFalse(ip_profile_to_ro.called)
2664
2665 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2666 def test__process_net_params_with_vim_info_sdn_target_vim(
2667 self,
2668 ip_profile_to_ro,
2669 ):
2670 target_vld = {
2671 "name": "vld-name",
2672 }
2673 indata = {
2674 "name": "ns-name",
2675 }
2676 vim_info = {
2677 "sdn": "some-sdn",
2678 "sdn-ports": ["some", "ports", "here"],
2679 "vlds": ["some", "vlds", "here"],
2680 "target_vim": "some-vim",
2681 "type": "sdn-type",
2682 }
2683 target_record_id = "vld.sdn.something"
2684 expected_result = {
2685 "depends_on": ["some-vim vld.sdn"],
2686 "params": {
2687 "sdn-ports": ["some", "ports", "here"],
2688 "vlds": ["some", "vlds", "here"],
2689 "target_vim": "some-vim",
2690 "type": "sdn-type",
2691 },
2692 }
2693
2694 result = Ns._process_net_params(
2695 target_vld=target_vld,
2696 indata=indata,
2697 vim_info=vim_info,
2698 target_record_id=target_record_id,
2699 )
2700
2701 self.assertDictEqual(expected_result, result)
2702 self.assertFalse(ip_profile_to_ro.called)
2703
2704 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2705 def test__process_net_params_with_vim_network_name(
2706 self,
2707 ip_profile_to_ro,
2708 ):
2709 target_vld = {
2710 "name": "vld-name",
2711 }
2712 indata = {
2713 "name": "ns-name",
2714 }
2715 vim_info = {
2716 "vim_network_name": "some-network-name",
2717 }
2718 target_record_id = "vld.sdn.something"
2719 expected_result = {
2720 "find_params": {
2721 "filter_dict": {
2722 "name": "some-network-name",
2723 },
2724 },
2725 }
2726
2727 result = Ns._process_net_params(
2728 target_vld=target_vld,
2729 indata=indata,
2730 vim_info=vim_info,
2731 target_record_id=target_record_id,
2732 )
2733
2734 self.assertDictEqual(expected_result, result)
2735 self.assertFalse(ip_profile_to_ro.called)
2736
2737 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2738 def test__process_net_params_with_vim_network_id(
2739 self,
2740 ip_profile_to_ro,
2741 ):
2742 target_vld = {
2743 "name": "vld-name",
2744 }
2745 indata = {
2746 "name": "ns-name",
2747 }
2748 vim_info = {
2749 "vim_network_id": "some-network-id",
2750 }
2751 target_record_id = "vld.sdn.something"
2752 expected_result = {
2753 "find_params": {
2754 "filter_dict": {
2755 "id": "some-network-id",
2756 },
2757 },
2758 }
2759
2760 result = Ns._process_net_params(
2761 target_vld=target_vld,
2762 indata=indata,
2763 vim_info=vim_info,
2764 target_record_id=target_record_id,
2765 )
2766
2767 self.assertDictEqual(expected_result, result)
2768 self.assertFalse(ip_profile_to_ro.called)
2769
2770 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2771 def test__process_net_params_with_mgmt_network(
2772 self,
2773 ip_profile_to_ro,
2774 ):
2775 target_vld = {
2776 "id": "vld-id",
2777 "name": "vld-name",
2778 "mgmt-network": "some-mgmt-network",
2779 }
2780 indata = {
2781 "name": "ns-name",
2782 }
2783 vim_info = {}
2784 target_record_id = "vld.sdn.something"
2785 expected_result = {
2786 "find_params": {
2787 "mgmt": True,
2788 "name": "vld-id",
2789 },
2790 }
2791
2792 result = Ns._process_net_params(
2793 target_vld=target_vld,
2794 indata=indata,
2795 vim_info=vim_info,
2796 target_record_id=target_record_id,
2797 )
2798
2799 self.assertDictEqual(expected_result, result)
2800 self.assertFalse(ip_profile_to_ro.called)
2801
2802 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2803 def test__process_net_params_with_underlay_eline(
2804 self,
2805 ip_profile_to_ro,
2806 ):
2807 target_vld = {
2808 "name": "vld-name",
2809 "underlay": "some-underlay-here",
2810 "type": "ELINE",
2811 }
2812 indata = {
2813 "name": "ns-name",
2814 }
2815 vim_info = {
2816 "provider_network": "some-profile-here",
2817 }
2818 target_record_id = ""
2819 expected_result = {
2820 "params": {
2821 "ip_profile": {
2822 "some_ip_profile": "here",
2823 },
2824 "net_name": "ns-name-vld-name",
2825 "net_type": "ptp",
2826 "provider_network_profile": "some-profile-here",
2827 }
2828 }
2829
2830 ip_profile_to_ro.return_value = {
2831 "some_ip_profile": "here",
2832 }
2833
2834 result = Ns._process_net_params(
2835 target_vld=target_vld,
2836 indata=indata,
2837 vim_info=vim_info,
2838 target_record_id=target_record_id,
2839 )
2840
2841 self.assertDictEqual(expected_result, result)
2842 self.assertTrue(ip_profile_to_ro.called)
2843
2844 @patch("osm_ng_ro.ns.Ns._ip_profile_to_ro")
2845 def test__process_net_params_with_underlay_elan(
2846 self,
2847 ip_profile_to_ro,
2848 ):
2849 target_vld = {
2850 "name": "vld-name",
2851 "underlay": "some-underlay-here",
2852 "type": "ELAN",
2853 }
2854 indata = {
2855 "name": "ns-name",
2856 }
2857 vim_info = {
2858 "provider_network": "some-profile-here",
2859 }
2860 target_record_id = ""
2861 expected_result = {
2862 "params": {
2863 "ip_profile": {
2864 "some_ip_profile": "here",
2865 },
2866 "net_name": "ns-name-vld-name",
2867 "net_type": "data",
2868 "provider_network_profile": "some-profile-here",
2869 }
2870 }
2871
2872 ip_profile_to_ro.return_value = {
2873 "some_ip_profile": "here",
2874 }
2875
2876 result = Ns._process_net_params(
2877 target_vld=target_vld,
2878 indata=indata,
2879 vim_info=vim_info,
2880 target_record_id=target_record_id,
2881 )
2882
2883 self.assertDictEqual(expected_result, result)
2884 self.assertTrue(ip_profile_to_ro.called)
2885
2886 def test__get_cloud_init_exception(self):
2887 db_mock = MagicMock(name="database mock")
2888 fs_mock = None
2889
2890 location = ""
2891
2892 with self.assertRaises(NsException):
2893 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2894
2895 def test__get_cloud_init_file_fs_exception(self):
2896 db_mock = MagicMock(name="database mock")
2897 fs_mock = None
2898
2899 location = "vnfr_id_123456:file:test_file"
2900 db_mock.get_one.return_value = {
2901 "_admin": {
2902 "storage": {
2903 "folder": "/home/osm",
2904 "pkg-dir": "vnfr_test_dir",
2905 },
2906 },
2907 }
2908
2909 with self.assertRaises(NsException):
2910 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2911
2912 def test__get_cloud_init_file(self):
2913 db_mock = MagicMock(name="database mock")
2914 fs_mock = MagicMock(name="filesystem mock")
2915 file_mock = MagicMock(name="file mock")
2916
2917 location = "vnfr_id_123456:file:test_file"
2918 cloud_init_content = "this is a cloud init file content"
2919
2920 db_mock.get_one.return_value = {
2921 "_admin": {
2922 "storage": {
2923 "folder": "/home/osm",
2924 "pkg-dir": "vnfr_test_dir",
2925 },
2926 },
2927 }
2928 fs_mock.file_open.return_value = file_mock
2929 file_mock.__enter__.return_value.read.return_value = cloud_init_content
2930
2931 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2932
2933 self.assertEqual(cloud_init_content, result)
2934
2935 def test__get_cloud_init_vdu(self):
2936 db_mock = MagicMock(name="database mock")
2937 fs_mock = None
2938
2939 location = "vnfr_id_123456:vdu:0"
2940 cloud_init_content = "this is a cloud init file content"
2941
2942 db_mock.get_one.return_value = {
2943 "vdu": {
2944 0: {
2945 "cloud-init": cloud_init_content,
2946 },
2947 },
2948 }
2949
2950 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location)
2951
2952 self.assertEqual(cloud_init_content, result)
2953
2954 @patch("jinja2.Environment.__init__")
2955 def test__parse_jinja2_undefined_error(self, env_mock: Mock):
2956 cloud_init_content = None
2957 params = None
2958 context = None
2959
2960 env_mock.side_effect = UndefinedError("UndefinedError occurred.")
2961
2962 with self.assertRaises(NsException):
2963 Ns._parse_jinja2(
2964 cloud_init_content=cloud_init_content, params=params, context=context
2965 )
2966
2967 @patch("jinja2.Environment.__init__")
2968 def test__parse_jinja2_template_error(self, env_mock: Mock):
2969 cloud_init_content = None
2970 params = None
2971 context = None
2972
2973 env_mock.side_effect = TemplateError("TemplateError occurred.")
2974
2975 with self.assertRaises(NsException):
2976 Ns._parse_jinja2(
2977 cloud_init_content=cloud_init_content, params=params, context=context
2978 )
2979
2980 @patch("jinja2.Environment.__init__")
2981 def test__parse_jinja2_template_not_found(self, env_mock: Mock):
2982 cloud_init_content = None
2983 params = None
2984 context = None
2985
2986 env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.")
2987
2988 with self.assertRaises(NsException):
2989 Ns._parse_jinja2(
2990 cloud_init_content=cloud_init_content, params=params, context=context
2991 )
2992
2993 def test_rendering_jinja2_temp_without_special_characters(self):
2994 cloud_init_content = """
2995 disk_setup:
2996 ephemeral0:
2997 table_type: {{type}}
2998 layout: True
2999 overwrite: {{is_override}}
3000 runcmd:
3001 - [ ls, -l, / ]
3002 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3003 """
3004 params = {
3005 "type": "mbr",
3006 "is_override": "False",
3007 "command": "; mkdir abc",
3008 }
3009 context = "cloud-init for VM"
3010 expected_result = """
3011 disk_setup:
3012 ephemeral0:
3013 table_type: mbr
3014 layout: True
3015 overwrite: False
3016 runcmd:
3017 - [ ls, -l, / ]
3018 - [ sh, -xc, "echo $(date) '; mkdir abc'" ]
3019 """
3020 result = Ns._parse_jinja2(
3021 cloud_init_content=cloud_init_content, params=params, context=context
3022 )
3023 self.assertEqual(result, expected_result)
3024
3025 def test_rendering_jinja2_temp_with_special_characters(self):
3026 cloud_init_content = """
3027 disk_setup:
3028 ephemeral0:
3029 table_type: {{type}}
3030 layout: True
3031 overwrite: {{is_override}}
3032 runcmd:
3033 - [ ls, -l, / ]
3034 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3035 """
3036 params = {
3037 "type": "mbr",
3038 "is_override": "False",
3039 "command": "& rm -rf",
3040 }
3041 context = "cloud-init for VM"
3042 expected_result = """
3043 disk_setup:
3044 ephemeral0:
3045 table_type: mbr
3046 layout: True
3047 overwrite: False
3048 runcmd:
3049 - [ ls, -l, / ]
3050 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3051 """
3052 result = Ns._parse_jinja2(
3053 cloud_init_content=cloud_init_content, params=params, context=context
3054 )
3055 self.assertNotEqual(result, expected_result)
3056
3057 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self):
3058 with patch("osm_ng_ro.ns.Environment") as mock_environment:
3059 mock_environment.return_value = Environment(
3060 undefined=StrictUndefined,
3061 autoescape=select_autoescape(default_for_string=False, default=False),
3062 )
3063 cloud_init_content = """
3064 disk_setup:
3065 ephemeral0:
3066 table_type: {{type}}
3067 layout: True
3068 overwrite: {{is_override}}
3069 runcmd:
3070 - [ ls, -l, / ]
3071 - [ sh, -xc, "echo $(date) '{{command}}'" ]
3072 """
3073 params = {
3074 "type": "mbr",
3075 "is_override": "False",
3076 "command": "& rm -rf /",
3077 }
3078 context = "cloud-init for VM"
3079 expected_result = """
3080 disk_setup:
3081 ephemeral0:
3082 table_type: mbr
3083 layout: True
3084 overwrite: False
3085 runcmd:
3086 - [ ls, -l, / ]
3087 - [ sh, -xc, "echo $(date) '& rm -rf /'" ]
3088 """
3089 result = Ns._parse_jinja2(
3090 cloud_init_content=cloud_init_content,
3091 params=params,
3092 context=context,
3093 )
3094 self.assertEqual(result, expected_result)
3095
3096 @patch("osm_ng_ro.ns.Ns._assign_vim")
3097 def test__rebuild_start_stop_task(self, assign_vim):
3098 self.ns = Ns()
3099 extra_dict = {}
3100 actions = ["start", "stop", "rebuild"]
3101 vdu_id = "bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3102 vnf_id = "665b4165-ce24-4320-bf19-b9a45bade49f"
3103 vdu_index = "0"
3104 action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
3105 nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
3106 task_index = 0
3107 target_vim = "vim:f9f370ac-0d44-41a7-9000-457f2332bc35"
3108 t = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3109 for action in actions:
3110 expected_result = {
3111 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3112 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3113 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3114 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0",
3115 "status": "SCHEDULED",
3116 "action": "EXEC",
3117 "item": "update",
3118 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0",
3119 "target_record_id": t,
3120 "params": {
3121 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3122 "action": action,
3123 },
3124 }
3125 extra_dict["params"] = {
3126 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3127 "action": action,
3128 }
3129 task = self.ns.rebuild_start_stop_task(
3130 vdu_id,
3131 vnf_id,
3132 vdu_index,
3133 action_id,
3134 nsr_id,
3135 task_index,
3136 target_vim,
3137 extra_dict,
3138 )
3139 self.assertEqual(task.get("action_id"), action_id)
3140 self.assertEqual(task.get("nsr_id"), nsr_id)
3141 self.assertEqual(task.get("target_id"), target_vim)
3142 self.assertDictEqual(task, expected_result)
3143
3144 @patch("osm_ng_ro.ns.Ns._assign_vim")
3145 def test_verticalscale_task(self, assign_vim):
3146 self.ns = Ns()
3147 extra_dict = {}
3148 vdu_index = "1"
3149 action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
3150 nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
3151 task_index = 1
3152 target_record_id = (
3153 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3154 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3155 )
3156
3157 expected_result = {
3158 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3159 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3160 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3161 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3162 "status": "SCHEDULED",
3163 "action": "EXEC",
3164 "item": "verticalscale",
3165 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3166 "target_record_id": target_record_id,
3167 "params": {
3168 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3169 "flavor_dict": "flavor_dict",
3170 },
3171 }
3172 vdu = {
3173 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3174 "vim_info": {
3175 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3176 },
3177 }
3178 vnf = {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3179 extra_dict["params"] = {
3180 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3181 "flavor_dict": "flavor_dict",
3182 }
3183 task = self.ns.verticalscale_task(
3184 vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
3185 )
3186
3187 self.assertDictEqual(task, expected_result)
3188
3189 @patch("osm_ng_ro.ns.Ns._assign_vim")
3190 def test_migrate_task(self, assign_vim):
3191 self.ns = Ns()
3192 extra_dict = {}
3193 vdu_index = "1"
3194 action_id = "bb937f49-3870-4169-b758-9732e1ff40f3"
3195 nsr_id = "993166fe-723e-4680-ac4b-b1af2541ae31"
3196 task_index = 1
3197 target_record_id = (
3198 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:"
3199 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1"
3200 )
3201
3202 expected_result = {
3203 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35",
3204 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3",
3205 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31",
3206 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1",
3207 "status": "SCHEDULED",
3208 "action": "EXEC",
3209 "item": "migrate",
3210 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1",
3211 "target_record_id": target_record_id,
3212 "params": {
3213 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3214 "migrate_host": "migrateToHost",
3215 },
3216 }
3217 vdu = {
3218 "id": "bb9c43f9-10a2-4569-a8a8-957c3528b6d1",
3219 "vim_info": {
3220 "vim:f9f370ac-0d44-41a7-9000-457f2332bc35": {"interfaces": []}
3221 },
3222 }
3223 vnf = {"_id": "665b4165-ce24-4320-bf19-b9a45bade49f"}
3224 extra_dict["params"] = {
3225 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396",
3226 "migrate_host": "migrateToHost",
3227 }
3228 task = self.ns.migrate_task(
3229 vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
3230 )
3231
3232 self.assertDictEqual(task, expected_result)
3233
3234
3235 class TestProcessVduParams(unittest.TestCase):
3236 def setUp(self):
3237 self.ns = Ns()
3238 self.logger = CopyingMock(autospec=True)
3239
3240 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3241 def test_find_persistent_root_volumes_empty_instantiation_vol_list(
3242 self, mock_volume_keeping_required
3243 ):
3244 """Find persistent root volume, instantiation_vol_list is empty."""
3245 vnfd = deepcopy(vnfd_wth_persistent_storage)
3246 target_vdu = target_vdu_wth_persistent_storage
3247 vdu_instantiation_volumes_list = []
3248 disk_list = []
3249 mock_volume_keeping_required.return_value = True
3250 expected_root_disk = {
3251 "id": "persistent-root-volume",
3252 "type-of-storage": "persistent-storage:persistent-storage",
3253 "size-of-storage": "10",
3254 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}],
3255 }
3256 expected_persist_root_disk = {
3257 "persistent-root-volume": {
3258 "image_id": "ubuntu20.04",
3259 "size": "10",
3260 "keep": True,
3261 }
3262 }
3263 expected_disk_list = [
3264 {
3265 "image_id": "ubuntu20.04",
3266 "size": "10",
3267 "keep": True,
3268 },
3269 ]
3270 persist_root_disk = self.ns.find_persistent_root_volumes(
3271 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3272 )
3273 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3274 mock_volume_keeping_required.assert_called_once_with(expected_root_disk)
3275 self.assertEqual(disk_list, expected_disk_list)
3276 self.assertEqual(len(disk_list), 1)
3277
3278 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3279 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root(
3280 self, mock_volume_keeping_required
3281 ):
3282 """Find persistent root volume, always selects the first vsd as root volume."""
3283 vnfd = deepcopy(vnfd_wth_persistent_storage)
3284 vnfd["vdu"][0]["virtual-storage-desc"] = [
3285 "persistent-volume2",
3286 "persistent-root-volume",
3287 "ephemeral-volume",
3288 ]
3289 target_vdu = target_vdu_wth_persistent_storage
3290 vdu_instantiation_volumes_list = []
3291 disk_list = []
3292 mock_volume_keeping_required.return_value = True
3293 expected_root_disk = {
3294 "id": "persistent-volume2",
3295 "type-of-storage": "persistent-storage:persistent-storage",
3296 "size-of-storage": "10",
3297 }
3298 expected_persist_root_disk = {
3299 "persistent-volume2": {
3300 "image_id": "ubuntu20.04",
3301 "size": "10",
3302 "keep": True,
3303 }
3304 }
3305 expected_disk_list = [
3306 {
3307 "image_id": "ubuntu20.04",
3308 "size": "10",
3309 "keep": True,
3310 },
3311 ]
3312 persist_root_disk = self.ns.find_persistent_root_volumes(
3313 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3314 )
3315 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3316 mock_volume_keeping_required.assert_called_once_with(expected_root_disk)
3317 self.assertEqual(disk_list, expected_disk_list)
3318 self.assertEqual(len(disk_list), 1)
3319
3320 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3321 def test_find_persistent_root_volumes_empty_size_of_storage(
3322 self, mock_volume_keeping_required
3323 ):
3324 """Find persistent root volume, size of storage is empty."""
3325 vnfd = deepcopy(vnfd_wth_persistent_storage)
3326 vnfd["virtual-storage-desc"][0]["size-of-storage"] = ""
3327 vnfd["vdu"][0]["virtual-storage-desc"] = [
3328 "persistent-volume2",
3329 "persistent-root-volume",
3330 "ephemeral-volume",
3331 ]
3332 target_vdu = target_vdu_wth_persistent_storage
3333 vdu_instantiation_volumes_list = []
3334 disk_list = []
3335 persist_root_disk = self.ns.find_persistent_root_volumes(
3336 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3337 )
3338 self.assertEqual(persist_root_disk, None)
3339 mock_volume_keeping_required.assert_not_called()
3340 self.assertEqual(disk_list, [])
3341
3342 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3343 def test_find_persistent_root_volumes_keeping_is_not_required(
3344 self, mock_volume_keeping_required
3345 ):
3346 """Find persistent root volume, volume keeping is not required."""
3347 vnfd = deepcopy(vnfd_wth_persistent_storage)
3348 vnfd["virtual-storage-desc"][1]["vdu-storage-requirements"] = [
3349 {"key": "keep-volume", "value": "false"},
3350 ]
3351 target_vdu = target_vdu_wth_persistent_storage
3352 vdu_instantiation_volumes_list = []
3353 disk_list = []
3354 mock_volume_keeping_required.return_value = False
3355 expected_root_disk = {
3356 "id": "persistent-root-volume",
3357 "type-of-storage": "persistent-storage:persistent-storage",
3358 "size-of-storage": "10",
3359 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}],
3360 }
3361 expected_persist_root_disk = {
3362 "persistent-root-volume": {
3363 "image_id": "ubuntu20.04",
3364 "size": "10",
3365 "keep": False,
3366 }
3367 }
3368 expected_disk_list = [
3369 {
3370 "image_id": "ubuntu20.04",
3371 "size": "10",
3372 "keep": False,
3373 },
3374 ]
3375 persist_root_disk = self.ns.find_persistent_root_volumes(
3376 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3377 )
3378 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3379 mock_volume_keeping_required.assert_called_once_with(expected_root_disk)
3380 self.assertEqual(disk_list, expected_disk_list)
3381 self.assertEqual(len(disk_list), 1)
3382
3383 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3384 def test_find_persistent_root_volumes_target_vdu_mismatch(
3385 self, mock_volume_keeping_required
3386 ):
3387 """Find persistent root volume, target vdu name is not matching."""
3388 vnfd = deepcopy(vnfd_wth_persistent_storage)
3389 vnfd["vdu"][0]["name"] = "Several_Volumes-VM"
3390 target_vdu = target_vdu_wth_persistent_storage
3391 vdu_instantiation_volumes_list = []
3392 disk_list = []
3393 result = self.ns.find_persistent_root_volumes(
3394 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3395 )
3396 self.assertEqual(result, None)
3397 mock_volume_keeping_required.assert_not_called()
3398 self.assertEqual(disk_list, [])
3399 self.assertEqual(len(disk_list), 0)
3400
3401 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3402 def test_find_persistent_root_volumes_with_instantiation_vol_list(
3403 self, mock_volume_keeping_required
3404 ):
3405 """Find persistent root volume, existing volume needs to be used."""
3406 vnfd = deepcopy(vnfd_wth_persistent_storage)
3407 target_vdu = target_vdu_wth_persistent_storage
3408 vdu_instantiation_volumes_list = [
3409 {
3410 "vim-volume-id": vim_volume_id,
3411 "name": "persistent-root-volume",
3412 }
3413 ]
3414 disk_list = []
3415 expected_persist_root_disk = {
3416 "persistent-root-volume": {
3417 "vim_volume_id": vim_volume_id,
3418 "image_id": "ubuntu20.04",
3419 },
3420 }
3421 expected_disk_list = [
3422 {
3423 "vim_volume_id": vim_volume_id,
3424 "image_id": "ubuntu20.04",
3425 },
3426 ]
3427 persist_root_disk = self.ns.find_persistent_root_volumes(
3428 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3429 )
3430 self.assertEqual(persist_root_disk, expected_persist_root_disk)
3431 mock_volume_keeping_required.assert_not_called()
3432 self.assertEqual(disk_list, expected_disk_list)
3433 self.assertEqual(len(disk_list), 1)
3434
3435 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3436 def test_find_persistent_root_volumes_invalid_instantiation_params(
3437 self, mock_volume_keeping_required
3438 ):
3439 """Find persistent root volume, existing volume id keyword is invalid."""
3440 vnfd = deepcopy(vnfd_wth_persistent_storage)
3441 target_vdu = target_vdu_wth_persistent_storage
3442 vdu_instantiation_volumes_list = [
3443 {
3444 "volume-id": vim_volume_id,
3445 "name": "persistent-root-volume",
3446 }
3447 ]
3448 disk_list = []
3449 with self.assertRaises(KeyError):
3450 self.ns.find_persistent_root_volumes(
3451 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
3452 )
3453 mock_volume_keeping_required.assert_not_called()
3454 self.assertEqual(disk_list, [])
3455 self.assertEqual(len(disk_list), 0)
3456
3457 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3458 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list(
3459 self, mock_volume_keeping_required
3460 ):
3461 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty."""
3462 persistent_root_disk = {
3463 "persistent-root-volume": {
3464 "image_id": "ubuntu20.04",
3465 "size": "10",
3466 "keep": False,
3467 }
3468 }
3469 mock_volume_keeping_required.return_value = False
3470 target_vdu = target_vdu_wth_persistent_storage
3471 vdu_instantiation_volumes_list = []
3472 disk_list = [
3473 {
3474 "image_id": "ubuntu20.04",
3475 "size": "10",
3476 "keep": False,
3477 },
3478 ]
3479 expected_disk = {
3480 "id": "persistent-volume2",
3481 "size-of-storage": "10",
3482 "type-of-storage": "persistent-storage:persistent-storage",
3483 }
3484 expected_disk_list = [
3485 {
3486 "image_id": "ubuntu20.04",
3487 "size": "10",
3488 "keep": False,
3489 },
3490 {
3491 "size": "10",
3492 "keep": False,
3493 },
3494 ]
3495 self.ns.find_persistent_volumes(
3496 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3497 )
3498 self.assertEqual(disk_list, expected_disk_list)
3499 mock_volume_keeping_required.assert_called_once_with(expected_disk)
3500
3501 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3502 def test_find_persistent_volumes_vdu_wth_inst_vol_list(
3503 self, mock_volume_keeping_required
3504 ):
3505 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter."""
3506 persistent_root_disk = {
3507 "persistent-root-volume": {
3508 "image_id": "ubuntu20.04",
3509 "size": "10",
3510 "keep": False,
3511 }
3512 }
3513 vdu_instantiation_volumes_list = [
3514 {
3515 "vim-volume-id": vim_volume_id,
3516 "name": "persistent-volume2",
3517 }
3518 ]
3519 target_vdu = target_vdu_wth_persistent_storage
3520 disk_list = [
3521 {
3522 "image_id": "ubuntu20.04",
3523 "size": "10",
3524 "keep": False,
3525 },
3526 ]
3527 expected_disk_list = [
3528 {
3529 "image_id": "ubuntu20.04",
3530 "size": "10",
3531 "keep": False,
3532 },
3533 {
3534 "vim_volume_id": vim_volume_id,
3535 },
3536 ]
3537 self.ns.find_persistent_volumes(
3538 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3539 )
3540 self.assertEqual(disk_list, expected_disk_list)
3541 mock_volume_keeping_required.assert_not_called()
3542
3543 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3544 def test_find_persistent_volumes_vdu_wthout_persistent_storage(
3545 self, mock_volume_keeping_required
3546 ):
3547 """Find persistent ordinary volume, there is not any persistent disk."""
3548 persistent_root_disk = {}
3549 vdu_instantiation_volumes_list = []
3550 mock_volume_keeping_required.return_value = False
3551 target_vdu = target_vdu_wthout_persistent_storage
3552 disk_list = []
3553 self.ns.find_persistent_volumes(
3554 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3555 )
3556 self.assertEqual(disk_list, disk_list)
3557 mock_volume_keeping_required.assert_not_called()
3558
3559 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3560 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk(
3561 self, mock_volume_keeping_required
3562 ):
3563 """There is persistent root disk, but there is not ordinary persistent disk."""
3564 persistent_root_disk = {
3565 "persistent-root-volume": {
3566 "image_id": "ubuntu20.04",
3567 "size": "10",
3568 "keep": False,
3569 }
3570 }
3571 vdu_instantiation_volumes_list = []
3572 mock_volume_keeping_required.return_value = False
3573 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3574 target_vdu["virtual-storages"] = [
3575 {
3576 "id": "persistent-root-volume",
3577 "size-of-storage": "10",
3578 "type-of-storage": "persistent-storage:persistent-storage",
3579 "vdu-storage-requirements": [
3580 {"key": "keep-volume", "value": "true"},
3581 ],
3582 },
3583 {
3584 "id": "ephemeral-volume",
3585 "size-of-storage": "1",
3586 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage",
3587 },
3588 ]
3589 disk_list = [
3590 {
3591 "image_id": "ubuntu20.04",
3592 "size": "10",
3593 "keep": False,
3594 },
3595 ]
3596 self.ns.find_persistent_volumes(
3597 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3598 )
3599 self.assertEqual(disk_list, disk_list)
3600 mock_volume_keeping_required.assert_not_called()
3601
3602 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
3603 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch(
3604 self, mock_volume_keeping_required
3605 ):
3606 """Find persistent ordinary volume, volume id is not persistent_root_disk dict,
3607 vim-volume-id is given as instantiation parameter but disk id is not matching.
3608 """
3609 mock_volume_keeping_required.return_value = True
3610 vdu_instantiation_volumes_list = [
3611 {
3612 "vim-volume-id": vim_volume_id,
3613 "name": "persistent-volume3",
3614 }
3615 ]
3616 persistent_root_disk = {
3617 "persistent-root-volume": {
3618 "image_id": "ubuntu20.04",
3619 "size": "10",
3620 "keep": False,
3621 }
3622 }
3623 disk_list = [
3624 {
3625 "image_id": "ubuntu20.04",
3626 "size": "10",
3627 "keep": False,
3628 },
3629 ]
3630 expected_disk_list = [
3631 {
3632 "image_id": "ubuntu20.04",
3633 "size": "10",
3634 "keep": False,
3635 },
3636 {
3637 "size": "10",
3638 "keep": True,
3639 },
3640 ]
3641 expected_disk = {
3642 "id": "persistent-volume2",
3643 "size-of-storage": "10",
3644 "type-of-storage": "persistent-storage:persistent-storage",
3645 }
3646 target_vdu = target_vdu_wth_persistent_storage
3647 self.ns.find_persistent_volumes(
3648 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list
3649 )
3650 self.assertEqual(disk_list, expected_disk_list)
3651 mock_volume_keeping_required.assert_called_once_with(expected_disk)
3652
3653 def test_is_volume_keeping_required_true(self):
3654 """Volume keeping is required."""
3655 virtual_storage_descriptor = {
3656 "id": "persistent-root-volume",
3657 "type-of-storage": "persistent-storage:persistent-storage",
3658 "size-of-storage": "10",
3659 "vdu-storage-requirements": [
3660 {"key": "keep-volume", "value": "true"},
3661 ],
3662 }
3663 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3664 self.assertEqual(result, True)
3665
3666 def test_is_volume_keeping_required_false(self):
3667 """Volume keeping is not required."""
3668 virtual_storage_descriptor = {
3669 "id": "persistent-root-volume",
3670 "type-of-storage": "persistent-storage:persistent-storage",
3671 "size-of-storage": "10",
3672 "vdu-storage-requirements": [
3673 {"key": "keep-volume", "value": "false"},
3674 ],
3675 }
3676 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3677 self.assertEqual(result, False)
3678
3679 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self):
3680 """Volume keeping is not required, vdu-storage-requirements key does not exist."""
3681 virtual_storage_descriptor = {
3682 "id": "persistent-root-volume",
3683 "type-of-storage": "persistent-storage:persistent-storage",
3684 "size-of-storage": "10",
3685 }
3686 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3687 self.assertEqual(result, False)
3688
3689 def test_is_volume_keeping_required_wrong_keyword(self):
3690 """vdu-storage-requirements key to indicate keeping-volume is wrong."""
3691 virtual_storage_descriptor = {
3692 "id": "persistent-root-volume",
3693 "type-of-storage": "persistent-storage:persistent-storage",
3694 "size-of-storage": "10",
3695 "vdu-storage-requirements": [
3696 {"key": "hold-volume", "value": "true"},
3697 ],
3698 }
3699 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor)
3700 self.assertEqual(result, False)
3701
3702 def test_sort_vdu_interfaces_position_all_wth_positions(self):
3703 """Interfaces are sorted according to position, all have positions."""
3704 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3705 target_vdu["interfaces"] = [
3706 {
3707 "name": "vdu-eth1",
3708 "ns-vld-id": "datanet",
3709 "position": 2,
3710 },
3711 {
3712 "name": "vdu-eth0",
3713 "ns-vld-id": "mgmtnet",
3714 "position": 1,
3715 },
3716 ]
3717 sorted_interfaces = [
3718 {
3719 "name": "vdu-eth0",
3720 "ns-vld-id": "mgmtnet",
3721 "position": 1,
3722 },
3723 {
3724 "name": "vdu-eth1",
3725 "ns-vld-id": "datanet",
3726 "position": 2,
3727 },
3728 ]
3729 self.ns._sort_vdu_interfaces(target_vdu)
3730 self.assertEqual(target_vdu["interfaces"], sorted_interfaces)
3731
3732 def test_sort_vdu_interfaces_position_some_wth_position(self):
3733 """Interfaces are sorted according to position, some of them have positions."""
3734 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3735 target_vdu["interfaces"] = [
3736 {
3737 "name": "vdu-eth0",
3738 "ns-vld-id": "mgmtnet",
3739 },
3740 {
3741 "name": "vdu-eth1",
3742 "ns-vld-id": "datanet",
3743 "position": 1,
3744 },
3745 ]
3746 sorted_interfaces = [
3747 {
3748 "name": "vdu-eth1",
3749 "ns-vld-id": "datanet",
3750 "position": 1,
3751 },
3752 {
3753 "name": "vdu-eth0",
3754 "ns-vld-id": "mgmtnet",
3755 },
3756 ]
3757 self.ns._sort_vdu_interfaces(target_vdu)
3758 self.assertEqual(target_vdu["interfaces"], sorted_interfaces)
3759
3760 def test_sort_vdu_interfaces_position_empty_interface_list(self):
3761 """Interface list is empty."""
3762 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3763 target_vdu["interfaces"] = []
3764 sorted_interfaces = []
3765 self.ns._sort_vdu_interfaces(target_vdu)
3766 self.assertEqual(target_vdu["interfaces"], sorted_interfaces)
3767
3768 def test_partially_locate_vdu_interfaces(self):
3769 """Some interfaces have positions."""
3770 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3771 target_vdu["interfaces"] = [
3772 {
3773 "name": "vdu-eth1",
3774 "ns-vld-id": "net1",
3775 },
3776 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3777 {
3778 "name": "vdu-eth3",
3779 "ns-vld-id": "mgmtnet",
3780 },
3781 {
3782 "name": "vdu-eth1",
3783 "ns-vld-id": "datanet",
3784 "position": 1,
3785 },
3786 ]
3787 self.ns._partially_locate_vdu_interfaces(target_vdu)
3788 self.assertDictEqual(
3789 target_vdu["interfaces"][0],
3790 {
3791 "name": "vdu-eth1",
3792 "ns-vld-id": "datanet",
3793 "position": 1,
3794 },
3795 )
3796 self.assertDictEqual(
3797 target_vdu["interfaces"][2],
3798 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3799 )
3800
3801 def test_partially_locate_vdu_interfaces_position_start_from_0(self):
3802 """Some interfaces have positions, position start from 0."""
3803 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3804 target_vdu["interfaces"] = [
3805 {
3806 "name": "vdu-eth1",
3807 "ns-vld-id": "net1",
3808 },
3809 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3810 {
3811 "name": "vdu-eth3",
3812 "ns-vld-id": "mgmtnet",
3813 },
3814 {
3815 "name": "vdu-eth1",
3816 "ns-vld-id": "datanet",
3817 "position": 0,
3818 },
3819 ]
3820 self.ns._partially_locate_vdu_interfaces(target_vdu)
3821 self.assertDictEqual(
3822 target_vdu["interfaces"][0],
3823 {
3824 "name": "vdu-eth1",
3825 "ns-vld-id": "datanet",
3826 "position": 0,
3827 },
3828 )
3829 self.assertDictEqual(
3830 target_vdu["interfaces"][3],
3831 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3},
3832 )
3833
3834 def test_partially_locate_vdu_interfaces_wthout_position(self):
3835 """Interfaces do not have positions."""
3836 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3837 target_vdu["interfaces"] = interfaces_wthout_positions
3838 expected_result = deepcopy(target_vdu["interfaces"])
3839 self.ns._partially_locate_vdu_interfaces(target_vdu)
3840 self.assertEqual(target_vdu["interfaces"], expected_result)
3841
3842 def test_partially_locate_vdu_interfaces_all_has_position(self):
3843 """All interfaces have position."""
3844 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3845 target_vdu["interfaces"] = interfaces_wth_all_positions
3846 expected_interfaces = [
3847 {
3848 "name": "vdu-eth2",
3849 "ns-vld-id": "net2",
3850 "position": 0,
3851 },
3852 {
3853 "name": "vdu-eth3",
3854 "ns-vld-id": "mgmtnet",
3855 "position": 1,
3856 },
3857 {
3858 "name": "vdu-eth1",
3859 "ns-vld-id": "net1",
3860 "position": 2,
3861 },
3862 ]
3863 self.ns._partially_locate_vdu_interfaces(target_vdu)
3864 self.assertEqual(target_vdu["interfaces"], expected_interfaces)
3865
3866 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3867 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3868 def test_prepare_vdu_cloud_init(self, mock_parse_jinja2, mock_get_cloud_init):
3869 """Target_vdu has cloud-init and boot-data-drive."""
3870 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3871 target_vdu["cloud-init"] = "sample-cloud-init-path"
3872 target_vdu["boot-data-drive"] = "vda"
3873 vdu2cloud_init = {}
3874 mock_get_cloud_init.return_value = cloud_init_content
3875 mock_parse_jinja2.return_value = user_data
3876 expected_result = {
3877 "user-data": user_data,
3878 "boot-data-drive": "vda",
3879 }
3880 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3881 self.assertDictEqual(result, expected_result)
3882 mock_get_cloud_init.assert_called_once_with(
3883 db=db, fs=fs, location="sample-cloud-init-path"
3884 )
3885 mock_parse_jinja2.assert_called_once_with(
3886 cloud_init_content=cloud_init_content,
3887 params=None,
3888 context="sample-cloud-init-path",
3889 )
3890
3891 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3892 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3893 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception(
3894 self, mock_parse_jinja2, mock_get_cloud_init
3895 ):
3896 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception."""
3897 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3898 target_vdu["cloud-init"] = "sample-cloud-init-path"
3899 target_vdu["boot-data-drive"] = "vda"
3900 vdu2cloud_init = {}
3901 mock_get_cloud_init.side_effect = NsException(
3902 "Mismatch descriptor for cloud init."
3903 )
3904
3905 with self.assertRaises(NsException) as err:
3906 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3907 self.assertEqual(str(err.exception), "Mismatch descriptor for cloud init.")
3908
3909 mock_get_cloud_init.assert_called_once_with(
3910 db=db, fs=fs, location="sample-cloud-init-path"
3911 )
3912 mock_parse_jinja2.assert_not_called()
3913
3914 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3915 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3916 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception(
3917 self, mock_parse_jinja2, mock_get_cloud_init
3918 ):
3919 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception."""
3920 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3921 target_vdu["cloud-init"] = "sample-cloud-init-path"
3922 target_vdu["boot-data-drive"] = "vda"
3923 vdu2cloud_init = {}
3924 mock_get_cloud_init.return_value = cloud_init_content
3925 mock_parse_jinja2.side_effect = NsException("Error parsing cloud-init content.")
3926
3927 with self.assertRaises(NsException) as err:
3928 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3929 self.assertEqual(str(err.exception), "Error parsing cloud-init content.")
3930 mock_get_cloud_init.assert_called_once_with(
3931 db=db, fs=fs, location="sample-cloud-init-path"
3932 )
3933 mock_parse_jinja2.assert_called_once_with(
3934 cloud_init_content=cloud_init_content,
3935 params=None,
3936 context="sample-cloud-init-path",
3937 )
3938
3939 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3940 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3941 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive(
3942 self, mock_parse_jinja2, mock_get_cloud_init
3943 ):
3944 """Target_vdu has cloud-init but do not have boot-data-drive."""
3945 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3946 target_vdu["cloud-init"] = "sample-cloud-init-path"
3947 vdu2cloud_init = {}
3948 mock_get_cloud_init.return_value = cloud_init_content
3949 mock_parse_jinja2.return_value = user_data
3950 expected_result = {
3951 "user-data": user_data,
3952 }
3953 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3954 self.assertDictEqual(result, expected_result)
3955 mock_get_cloud_init.assert_called_once_with(
3956 db=db, fs=fs, location="sample-cloud-init-path"
3957 )
3958 mock_parse_jinja2.assert_called_once_with(
3959 cloud_init_content=cloud_init_content,
3960 params=None,
3961 context="sample-cloud-init-path",
3962 )
3963
3964 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3965 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3966 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init(
3967 self, mock_parse_jinja2, mock_get_cloud_init
3968 ):
3969 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content."""
3970 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3971 target_vdu["cloud-init"] = "sample-cloud-init-path"
3972 target_vdu["boot-data-drive"] = "vda"
3973 vdu2cloud_init = {"sample-cloud-init-path": cloud_init_content}
3974 mock_parse_jinja2.return_value = user_data
3975 expected_result = {
3976 "user-data": user_data,
3977 "boot-data-drive": "vda",
3978 }
3979 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
3980 self.assertDictEqual(result, expected_result)
3981 mock_get_cloud_init.assert_not_called()
3982 mock_parse_jinja2.assert_called_once_with(
3983 cloud_init_content=cloud_init_content,
3984 params=None,
3985 context="sample-cloud-init-path",
3986 )
3987
3988 @patch("osm_ng_ro.ns.Ns._get_cloud_init")
3989 @patch("osm_ng_ro.ns.Ns._parse_jinja2")
3990 def test_prepare_vdu_cloud_init_no_cloud_init(
3991 self, mock_parse_jinja2, mock_get_cloud_init
3992 ):
3993 """Target_vdu do not have cloud-init."""
3994 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
3995 target_vdu["boot-data-drive"] = "vda"
3996 vdu2cloud_init = {}
3997 expected_result = {
3998 "boot-data-drive": "vda",
3999 }
4000 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
4001 self.assertDictEqual(result, expected_result)
4002 mock_get_cloud_init.assert_not_called()
4003 mock_parse_jinja2.assert_not_called()
4004
4005 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self):
4006 """ns_vld and vnf_vld both exist."""
4007 interface = {
4008 "name": "vdu-eth0",
4009 "ns-vld-id": "mgmtnet",
4010 "vnf-vld-id": "mgmt_cp_int",
4011 }
4012 expected_result = f"{ns_preffix}:vld.mgmtnet"
4013 result = self.ns._check_vld_information_of_interfaces(
4014 interface, ns_preffix, vnf_preffix
4015 )
4016 self.assertEqual(result, expected_result)
4017
4018 def test_check_vld_information_of_interfaces_empty_interfaces(self):
4019 """Interface dict is empty."""
4020 interface = {}
4021 result = self.ns._check_vld_information_of_interfaces(
4022 interface, ns_preffix, vnf_preffix
4023 )
4024 self.assertEqual(result, "")
4025
4026 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self):
4027 """Interface dict has only vnf_vld."""
4028 interface = {
4029 "name": "vdu-eth0",
4030 "vnf-vld-id": "mgmt_cp_int",
4031 }
4032 expected_result = f"{vnf_preffix}:vld.mgmt_cp_int"
4033 result = self.ns._check_vld_information_of_interfaces(
4034 interface, ns_preffix, vnf_preffix
4035 )
4036 self.assertEqual(result, expected_result)
4037
4038 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix(
4039 self,
4040 ):
4041 """Interface dict has only vnf_vld but vnf_preffix does not exist."""
4042 interface = {
4043 "name": "vdu-eth0",
4044 "vnf-vld-id": "mgmt_cp_int",
4045 }
4046 vnf_preffix = None
4047 with self.assertRaises(Exception) as err:
4048 self.ns._check_vld_information_of_interfaces(
4049 interface, ns_preffix, vnf_preffix
4050 )
4051 self.assertEqual(type(err), TypeError)
4052
4053 def test_prepare_interface_port_security_has_security_details(self):
4054 """Interface dict has port security details."""
4055 interface = {
4056 "name": "vdu-eth0",
4057 "ns-vld-id": "mgmtnet",
4058 "vnf-vld-id": "mgmt_cp_int",
4059 "port-security-enabled": True,
4060 "port-security-disable-strategy": "allow-address-pairs",
4061 }
4062 expected_interface = {
4063 "name": "vdu-eth0",
4064 "ns-vld-id": "mgmtnet",
4065 "vnf-vld-id": "mgmt_cp_int",
4066 "port_security": True,
4067 "port_security_disable_strategy": "allow-address-pairs",
4068 }
4069 self.ns._prepare_interface_port_security(interface)
4070 self.assertDictEqual(interface, expected_interface)
4071
4072 def test_prepare_interface_port_security_empty_interfaces(self):
4073 """Interface dict is empty."""
4074 interface = {}
4075 expected_interface = {}
4076 self.ns._prepare_interface_port_security(interface)
4077 self.assertDictEqual(interface, expected_interface)
4078
4079 def test_prepare_interface_port_security_wthout_port_security(self):
4080 """Interface dict does not have port security details."""
4081 interface = {
4082 "name": "vdu-eth0",
4083 "ns-vld-id": "mgmtnet",
4084 "vnf-vld-id": "mgmt_cp_int",
4085 }
4086 expected_interface = {
4087 "name": "vdu-eth0",
4088 "ns-vld-id": "mgmtnet",
4089 "vnf-vld-id": "mgmt_cp_int",
4090 }
4091 self.ns._prepare_interface_port_security(interface)
4092 self.assertDictEqual(interface, expected_interface)
4093
4094 def test_create_net_item_of_interface_floating_ip_port_security(self):
4095 """Interface dict has floating ip, port-security details."""
4096 interface = {
4097 "name": "vdu-eth0",
4098 "vcpi": "sample_vcpi",
4099 "port_security": True,
4100 "port_security_disable_strategy": "allow-address-pairs",
4101 "floating_ip": "10.1.1.12",
4102 "ns-vld-id": "mgmtnet",
4103 "vnf-vld-id": "mgmt_cp_int",
4104 }
4105 net_text = f"{ns_preffix}"
4106 expected_net_item = {
4107 "name": "vdu-eth0",
4108 "port_security": True,
4109 "port_security_disable_strategy": "allow-address-pairs",
4110 "floating_ip": "10.1.1.12",
4111 "net_id": f"TASK-{ns_preffix}",
4112 "type": "virtual",
4113 }
4114 result = self.ns._create_net_item_of_interface(interface, net_text)
4115 self.assertDictEqual(result, expected_net_item)
4116
4117 def test_create_net_item_of_interface_invalid_net_text(self):
4118 """net-text is invalid."""
4119 interface = {
4120 "name": "vdu-eth0",
4121 "vcpi": "sample_vcpi",
4122 "port_security": True,
4123 "port_security_disable_strategy": "allow-address-pairs",
4124 "floating_ip": "10.1.1.12",
4125 "ns-vld-id": "mgmtnet",
4126 "vnf-vld-id": "mgmt_cp_int",
4127 }
4128 net_text = None
4129 with self.assertRaises(TypeError):
4130 self.ns._create_net_item_of_interface(interface, net_text)
4131
4132 def test_create_net_item_of_interface_empty_interface(self):
4133 """Interface dict is empty."""
4134 interface = {}
4135 net_text = ns_preffix
4136 expected_net_item = {
4137 "net_id": f"TASK-{ns_preffix}",
4138 "type": "virtual",
4139 }
4140 result = self.ns._create_net_item_of_interface(interface, net_text)
4141 self.assertDictEqual(result, expected_net_item)
4142
4143 @patch("osm_ng_ro.ns.deep_get")
4144 def test_prepare_type_of_interface_type_sriov(self, mock_deep_get):
4145 """Interface type is SR-IOV."""
4146 interface = {
4147 "name": "vdu-eth0",
4148 "vcpi": "sample_vcpi",
4149 "port_security": True,
4150 "port_security_disable_strategy": "allow-address-pairs",
4151 "floating_ip": "10.1.1.12",
4152 "ns-vld-id": "mgmtnet",
4153 "vnf-vld-id": "mgmt_cp_int",
4154 "type": "SR-IOV",
4155 }
4156 mock_deep_get.return_value = "SR-IOV"
4157 net_text = ns_preffix
4158 net_item = {}
4159 expected_net_item = {
4160 "use": "data",
4161 "model": "SR-IOV",
4162 "type": "SR-IOV",
4163 }
4164 self.ns._prepare_type_of_interface(
4165 interface, tasks_by_target_record_id, net_text, net_item
4166 )
4167 self.assertDictEqual(net_item, expected_net_item)
4168 self.assertEqual(
4169 "data",
4170 tasks_by_target_record_id[net_text]["extra_dict"]["params"]["net_type"],
4171 )
4172 mock_deep_get.assert_called_once_with(
4173 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type"
4174 )
4175
4176 @patch("osm_ng_ro.ns.deep_get")
4177 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict(
4178 self, mock_deep_get
4179 ):
4180 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict."""
4181 interface = {
4182 "name": "vdu-eth0",
4183 "vcpi": "sample_vcpi",
4184 "port_security": True,
4185 "port_security_disable_strategy": "allow-address-pairs",
4186 "floating_ip": "10.1.1.12",
4187 "ns-vld-id": "mgmtnet",
4188 "vnf-vld-id": "mgmt_cp_int",
4189 "type": "PCI-PASSTHROUGH",
4190 }
4191 mock_deep_get.return_value = {}
4192 tasks_by_target_record_id = {}
4193 net_text = ns_preffix
4194 net_item = {}
4195 expected_net_item = {
4196 "use": "data",
4197 "model": "PCI-PASSTHROUGH",
4198 "type": "PCI-PASSTHROUGH",
4199 }
4200 self.ns._prepare_type_of_interface(
4201 interface, tasks_by_target_record_id, net_text, net_item
4202 )
4203 self.assertDictEqual(net_item, expected_net_item)
4204 mock_deep_get.assert_called_once_with(
4205 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type"
4206 )
4207
4208 @patch("osm_ng_ro.ns.deep_get")
4209 def test_prepare_type_of_interface_type_mgmt(self, mock_deep_get):
4210 """Interface type is mgmt."""
4211 interface = {
4212 "name": "vdu-eth0",
4213 "vcpi": "sample_vcpi",
4214 "port_security": True,
4215 "port_security_disable_strategy": "allow-address-pairs",
4216 "floating_ip": "10.1.1.12",
4217 "ns-vld-id": "mgmtnet",
4218 "vnf-vld-id": "mgmt_cp_int",
4219 "type": "OM-MGMT",
4220 }
4221 tasks_by_target_record_id = {}
4222 net_text = ns_preffix
4223 net_item = {}
4224 expected_net_item = {
4225 "use": "mgmt",
4226 }
4227 self.ns._prepare_type_of_interface(
4228 interface, tasks_by_target_record_id, net_text, net_item
4229 )
4230 self.assertDictEqual(net_item, expected_net_item)
4231 mock_deep_get.assert_not_called()
4232
4233 @patch("osm_ng_ro.ns.deep_get")
4234 def test_prepare_type_of_interface_type_bridge(self, mock_deep_get):
4235 """Interface type is bridge."""
4236 interface = {
4237 "name": "vdu-eth0",
4238 "vcpi": "sample_vcpi",
4239 "port_security": True,
4240 "port_security_disable_strategy": "allow-address-pairs",
4241 "floating_ip": "10.1.1.12",
4242 "ns-vld-id": "mgmtnet",
4243 "vnf-vld-id": "mgmt_cp_int",
4244 }
4245 tasks_by_target_record_id = {}
4246 net_text = ns_preffix
4247 net_item = {}
4248 expected_net_item = {
4249 "use": "bridge",
4250 "model": None,
4251 }
4252 self.ns._prepare_type_of_interface(
4253 interface, tasks_by_target_record_id, net_text, net_item
4254 )
4255 self.assertDictEqual(net_item, expected_net_item)
4256 mock_deep_get.assert_not_called()
4257
4258 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4259 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4260 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4261 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4262 def test_prepare_vdu_interfaces(
4263 self,
4264 mock_type_of_interface,
4265 mock_item_of_interface,
4266 mock_port_security,
4267 mock_vld_information_of_interface,
4268 ):
4269 """Prepare vdu interfaces successfully."""
4270 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4271 interface_1 = {
4272 "name": "vdu-eth1",
4273 "ns-vld-id": "net1",
4274 "ip-address": "13.2.12.31",
4275 "mgmt-interface": True,
4276 }
4277 interface_2 = {
4278 "name": "vdu-eth2",
4279 "vnf-vld-id": "net2",
4280 "mac-address": "d0:94:66:ed:fc:e2",
4281 }
4282 interface_3 = {
4283 "name": "vdu-eth3",
4284 "ns-vld-id": "mgmtnet",
4285 }
4286 target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
4287 extra_dict = {
4288 "params": "test_params",
4289 "find_params": "test_find_params",
4290 "depends_on": [],
4291 }
4292
4293 net_text_1 = f"{ns_preffix}:net1"
4294 net_text_2 = f"{vnf_preffix}:net2"
4295 net_text_3 = f"{ns_preffix}:mgmtnet"
4296 net_item_1 = {
4297 "name": "vdu-eth1",
4298 "net_id": f"TASK-{ns_preffix}",
4299 "type": "virtual",
4300 }
4301 net_item_2 = {
4302 "name": "vdu-eth2",
4303 "net_id": f"TASK-{ns_preffix}",
4304 "type": "virtual",
4305 }
4306 net_item_3 = {
4307 "name": "vdu-eth3",
4308 "net_id": f"TASK-{ns_preffix}",
4309 "type": "virtual",
4310 }
4311 mock_item_of_interface.side_effect = [net_item_1, net_item_2, net_item_3]
4312 mock_vld_information_of_interface.side_effect = [
4313 net_text_1,
4314 net_text_2,
4315 net_text_3,
4316 ]
4317 net_list = []
4318 expected_extra_dict = {
4319 "params": "test_params",
4320 "find_params": "test_find_params",
4321 "depends_on": [net_text_1, net_text_2, net_text_3],
4322 "mgmt_vdu_interface": 0,
4323 }
4324 updated_net_item1 = deepcopy(net_item_1)
4325 updated_net_item1.update({"ip_address": "13.2.12.31"})
4326 updated_net_item2 = deepcopy(net_item_2)
4327 updated_net_item2.update({"mac_address": "d0:94:66:ed:fc:e2"})
4328 expected_net_list = [updated_net_item1, updated_net_item2, net_item_3]
4329 self.ns._prepare_vdu_interfaces(
4330 target_vdu,
4331 extra_dict,
4332 ns_preffix,
4333 vnf_preffix,
4334 self.logger,
4335 tasks_by_target_record_id,
4336 net_list,
4337 )
4338 _call_mock_vld_information_of_interface = (
4339 mock_vld_information_of_interface.call_args_list
4340 )
4341 self.assertEqual(
4342 _call_mock_vld_information_of_interface[0][0],
4343 (interface_1, ns_preffix, vnf_preffix),
4344 )
4345 self.assertEqual(
4346 _call_mock_vld_information_of_interface[1][0],
4347 (interface_2, ns_preffix, vnf_preffix),
4348 )
4349 self.assertEqual(
4350 _call_mock_vld_information_of_interface[2][0],
4351 (interface_3, ns_preffix, vnf_preffix),
4352 )
4353
4354 _call_mock_port_security = mock_port_security.call_args_list
4355 self.assertEqual(_call_mock_port_security[0].args[0], interface_1)
4356 self.assertEqual(_call_mock_port_security[1].args[0], interface_2)
4357 self.assertEqual(_call_mock_port_security[2].args[0], interface_3)
4358
4359 _call_mock_item_of_interface = mock_item_of_interface.call_args_list
4360 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1))
4361 self.assertEqual(_call_mock_item_of_interface[1][0], (interface_2, net_text_2))
4362 self.assertEqual(_call_mock_item_of_interface[2][0], (interface_3, net_text_3))
4363
4364 _call_mock_type_of_interface = mock_type_of_interface.call_args_list
4365 self.assertEqual(
4366 _call_mock_type_of_interface[0][0],
4367 (interface_1, tasks_by_target_record_id, net_text_1, net_item_1),
4368 )
4369 self.assertEqual(
4370 _call_mock_type_of_interface[1][0],
4371 (interface_2, tasks_by_target_record_id, net_text_2, net_item_2),
4372 )
4373 self.assertEqual(
4374 _call_mock_type_of_interface[2][0],
4375 (interface_3, tasks_by_target_record_id, net_text_3, net_item_3),
4376 )
4377 self.assertEqual(net_list, expected_net_list)
4378 self.assertEqual(extra_dict, expected_extra_dict)
4379 self.logger.error.assert_not_called()
4380
4381 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4382 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4383 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4384 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4385 def test_prepare_vdu_interfaces_create_net_item_raise_exception(
4386 self,
4387 mock_type_of_interface,
4388 mock_item_of_interface,
4389 mock_port_security,
4390 mock_vld_information_of_interface,
4391 ):
4392 """Prepare vdu interfaces, create_net_item_of_interface method raise exception."""
4393 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4394 interface_1 = {
4395 "name": "vdu-eth1",
4396 "ns-vld-id": "net1",
4397 "ip-address": "13.2.12.31",
4398 "mgmt-interface": True,
4399 }
4400 interface_2 = {
4401 "name": "vdu-eth2",
4402 "vnf-vld-id": "net2",
4403 "mac-address": "d0:94:66:ed:fc:e2",
4404 }
4405 interface_3 = {
4406 "name": "vdu-eth3",
4407 "ns-vld-id": "mgmtnet",
4408 }
4409 target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
4410 extra_dict = {
4411 "params": "test_params",
4412 "find_params": "test_find_params",
4413 "depends_on": [],
4414 }
4415 net_text_1 = f"{ns_preffix}:net1"
4416 mock_item_of_interface.side_effect = [TypeError, TypeError, TypeError]
4417
4418 mock_vld_information_of_interface.side_effect = [net_text_1]
4419 net_list = []
4420 expected_extra_dict = {
4421 "params": "test_params",
4422 "find_params": "test_find_params",
4423 "depends_on": [net_text_1],
4424 }
4425 with self.assertRaises(TypeError):
4426 self.ns._prepare_vdu_interfaces(
4427 target_vdu,
4428 extra_dict,
4429 ns_preffix,
4430 vnf_preffix,
4431 self.logger,
4432 tasks_by_target_record_id,
4433 net_list,
4434 )
4435
4436 _call_mock_vld_information_of_interface = (
4437 mock_vld_information_of_interface.call_args_list
4438 )
4439 self.assertEqual(
4440 _call_mock_vld_information_of_interface[0][0],
4441 (interface_1, ns_preffix, vnf_preffix),
4442 )
4443
4444 _call_mock_port_security = mock_port_security.call_args_list
4445 self.assertEqual(_call_mock_port_security[0].args[0], interface_1)
4446
4447 _call_mock_item_of_interface = mock_item_of_interface.call_args_list
4448 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1))
4449
4450 mock_type_of_interface.assert_not_called()
4451 self.logger.error.assert_not_called()
4452 self.assertEqual(net_list, [])
4453 self.assertEqual(extra_dict, expected_extra_dict)
4454
4455 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4456 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4457 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4458 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4459 def test_prepare_vdu_interfaces_vld_information_is_empty(
4460 self,
4461 mock_type_of_interface,
4462 mock_item_of_interface,
4463 mock_port_security,
4464 mock_vld_information_of_interface,
4465 ):
4466 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result."""
4467 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4468 interface_1 = {
4469 "name": "vdu-eth1",
4470 "ns-vld-id": "net1",
4471 "ip-address": "13.2.12.31",
4472 "mgmt-interface": True,
4473 }
4474 interface_2 = {
4475 "name": "vdu-eth2",
4476 "vnf-vld-id": "net2",
4477 "mac-address": "d0:94:66:ed:fc:e2",
4478 }
4479 interface_3 = {
4480 "name": "vdu-eth3",
4481 "ns-vld-id": "mgmtnet",
4482 }
4483 target_vdu["interfaces"] = [interface_1, interface_2, interface_3]
4484 extra_dict = {
4485 "params": "test_params",
4486 "find_params": "test_find_params",
4487 "depends_on": [],
4488 }
4489 mock_vld_information_of_interface.side_effect = ["", "", ""]
4490 net_list = []
4491 self.ns._prepare_vdu_interfaces(
4492 target_vdu,
4493 extra_dict,
4494 ns_preffix,
4495 vnf_preffix,
4496 self.logger,
4497 tasks_by_target_record_id,
4498 net_list,
4499 )
4500
4501 _call_mock_vld_information_of_interface = (
4502 mock_vld_information_of_interface.call_args_list
4503 )
4504 self.assertEqual(
4505 _call_mock_vld_information_of_interface[0][0],
4506 (interface_1, ns_preffix, vnf_preffix),
4507 )
4508 self.assertEqual(
4509 _call_mock_vld_information_of_interface[1][0],
4510 (interface_2, ns_preffix, vnf_preffix),
4511 )
4512 self.assertEqual(
4513 _call_mock_vld_information_of_interface[2][0],
4514 (interface_3, ns_preffix, vnf_preffix),
4515 )
4516
4517 _call_logger = self.logger.error.call_args_list
4518 self.assertEqual(
4519 _call_logger[0][0],
4520 ("Interface 0 from vdu several_volumes-VM not connected to any vld",),
4521 )
4522 self.assertEqual(
4523 _call_logger[1][0],
4524 ("Interface 1 from vdu several_volumes-VM not connected to any vld",),
4525 )
4526 self.assertEqual(
4527 _call_logger[2][0],
4528 ("Interface 2 from vdu several_volumes-VM not connected to any vld",),
4529 )
4530 self.assertEqual(net_list, [])
4531 self.assertEqual(
4532 extra_dict,
4533 {
4534 "params": "test_params",
4535 "find_params": "test_find_params",
4536 "depends_on": [],
4537 },
4538 )
4539
4540 mock_item_of_interface.assert_not_called()
4541 mock_port_security.assert_not_called()
4542 mock_type_of_interface.assert_not_called()
4543
4544 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces")
4545 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security")
4546 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface")
4547 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface")
4548 def test_prepare_vdu_interfaces_empty_interface_list(
4549 self,
4550 mock_type_of_interface,
4551 mock_item_of_interface,
4552 mock_port_security,
4553 mock_vld_information_of_interface,
4554 ):
4555 """Prepare vdu interfaces, interface list is empty."""
4556 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4557 target_vdu["interfaces"] = []
4558 extra_dict = {}
4559 net_list = []
4560 self.ns._prepare_vdu_interfaces(
4561 target_vdu,
4562 extra_dict,
4563 ns_preffix,
4564 vnf_preffix,
4565 self.logger,
4566 tasks_by_target_record_id,
4567 net_list,
4568 )
4569 mock_type_of_interface.assert_not_called()
4570 mock_vld_information_of_interface.assert_not_called()
4571 mock_item_of_interface.assert_not_called()
4572 mock_port_security.assert_not_called()
4573
4574 def test_prepare_vdu_ssh_keys(self):
4575 """Target_vdu has ssh-keys and ro_nsr_public_key exists."""
4576 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4577 target_vdu["ssh-keys"] = ["sample-ssh-key"]
4578 ro_nsr_public_key = {"public_key": "path_of_public_key"}
4579 target_vdu["ssh-access-required"] = True
4580 cloud_config = {}
4581 expected_cloud_config = {
4582 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}]
4583 }
4584 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config)
4585 self.assertDictEqual(cloud_config, expected_cloud_config)
4586
4587 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self):
4588 """Target_vdu does not have ssh-keys."""
4589 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4590 ro_nsr_public_key = {"public_key": "path_of_public_key"}
4591 target_vdu["ssh-access-required"] = True
4592 cloud_config = {}
4593 expected_cloud_config = {"key-pairs": [{"public_key": "path_of_public_key"}]}
4594 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config)
4595 self.assertDictEqual(cloud_config, expected_cloud_config)
4596
4597 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self):
4598 """Target_vdu has ssh-keys, ssh-access is not required."""
4599 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4600 target_vdu["ssh-keys"] = ["sample-ssh-key"]
4601 ro_nsr_public_key = {"public_key": "path_of_public_key"}
4602 target_vdu["ssh-access-required"] = False
4603 cloud_config = {}
4604 expected_cloud_config = {"key-pairs": ["sample-ssh-key"]}
4605 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config)
4606 self.assertDictEqual(cloud_config, expected_cloud_config)
4607
4608 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4609 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4610 def test_add_persistent_root_disk_to_disk_list_keep_false(
4611 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4612 ):
4613 """Add persistent root disk to disk_list, keep volume set to False."""
4614 root_disk = {
4615 "id": "persistent-root-volume",
4616 "type-of-storage": "persistent-storage:persistent-storage",
4617 "size-of-storage": "10",
4618 }
4619 mock_select_persistent_root_disk.return_value = root_disk
4620 vnfd = deepcopy(vnfd_wth_persistent_storage)
4621 vnfd["virtual-storage-desc"][1] = root_disk
4622 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4623 persistent_root_disk = {}
4624 disk_list = []
4625 mock_volume_keeping_required.return_value = False
4626 expected_disk_list = [
4627 {
4628 "image_id": "ubuntu20.04",
4629 "size": "10",
4630 "keep": False,
4631 }
4632 ]
4633 self.ns._add_persistent_root_disk_to_disk_list(
4634 vnfd, target_vdu, persistent_root_disk, disk_list
4635 )
4636 self.assertEqual(disk_list, expected_disk_list)
4637 mock_select_persistent_root_disk.assert_called_once()
4638 mock_volume_keeping_required.assert_called_once()
4639
4640 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4641 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4642 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises(
4643 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4644 ):
4645 """Add persistent root disk to disk_list"""
4646 root_disk = {
4647 "id": "persistent-root-volume",
4648 "type-of-storage": "persistent-storage:persistent-storage",
4649 "size-of-storage": "10",
4650 }
4651 mock_select_persistent_root_disk.side_effect = AttributeError
4652 vnfd = deepcopy(vnfd_wth_persistent_storage)
4653 vnfd["virtual-storage-desc"][1] = root_disk
4654 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4655 persistent_root_disk = {}
4656 disk_list = []
4657 with self.assertRaises(AttributeError):
4658 self.ns._add_persistent_root_disk_to_disk_list(
4659 vnfd, target_vdu, persistent_root_disk, disk_list
4660 )
4661 self.assertEqual(disk_list, [])
4662 mock_select_persistent_root_disk.assert_called_once()
4663 mock_volume_keeping_required.assert_not_called()
4664
4665 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4666 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4667 def test_add_persistent_root_disk_to_disk_list_keep_true(
4668 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4669 ):
4670 """Add persistent root disk, keeo volume set to True."""
4671 vnfd = deepcopy(vnfd_wth_persistent_storage)
4672 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4673 mock_volume_keeping_required.return_value = True
4674 root_disk = {
4675 "id": "persistent-root-volume",
4676 "type-of-storage": "persistent-storage:persistent-storage",
4677 "size-of-storage": "10",
4678 "vdu-storage-requirements": [
4679 {"key": "keep-volume", "value": "true"},
4680 ],
4681 }
4682 mock_select_persistent_root_disk.return_value = root_disk
4683 persistent_root_disk = {}
4684 disk_list = []
4685 expected_disk_list = [
4686 {
4687 "image_id": "ubuntu20.04",
4688 "size": "10",
4689 "keep": True,
4690 }
4691 ]
4692 self.ns._add_persistent_root_disk_to_disk_list(
4693 vnfd, target_vdu, persistent_root_disk, disk_list
4694 )
4695 self.assertEqual(disk_list, expected_disk_list)
4696 mock_volume_keeping_required.assert_called_once_with(root_disk)
4697
4698 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4699 def test_add_persistent_ordinary_disk_to_disk_list(
4700 self, mock_volume_keeping_required
4701 ):
4702 """Add persistent ordinary disk, keeo volume set to True."""
4703 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4704 mock_volume_keeping_required.return_value = False
4705 persistent_root_disk = {
4706 "persistent-root-volume": {
4707 "image_id": "ubuntu20.04",
4708 "size": "10",
4709 "keep": True,
4710 }
4711 }
4712 ordinary_disk = {
4713 "id": "persistent-volume2",
4714 "type-of-storage": "persistent-storage:persistent-storage",
4715 "size-of-storage": "10",
4716 }
4717 persistent_ordinary_disk = {}
4718 disk_list = []
4719 expected_disk_list = [
4720 {
4721 "size": "10",
4722 "keep": False,
4723 }
4724 ]
4725 self.ns._add_persistent_ordinary_disks_to_disk_list(
4726 target_vdu, persistent_root_disk, persistent_ordinary_disk, disk_list
4727 )
4728 self.assertEqual(disk_list, expected_disk_list)
4729 mock_volume_keeping_required.assert_called_once_with(ordinary_disk)
4730
4731 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4732 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict(
4733 self, mock_volume_keeping_required
4734 ):
4735 """Add persistent ordinary disk, vsd id is in root_disk dict."""
4736 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4737 mock_volume_keeping_required.return_value = False
4738 persistent_root_disk = {
4739 "persistent-root-volume": {
4740 "image_id": "ubuntu20.04",
4741 "size": "10",
4742 "keep": True,
4743 },
4744 "persistent-volume2": {
4745 "size": "10",
4746 },
4747 }
4748 persistent_ordinary_disk = {}
4749 disk_list = []
4750
4751 self.ns._add_persistent_ordinary_disks_to_disk_list(
4752 target_vdu, persistent_root_disk, persistent_ordinary_disk, disk_list
4753 )
4754 self.assertEqual(disk_list, [])
4755 mock_volume_keeping_required.assert_not_called()
4756
4757 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4758 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4759 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage(
4760 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4761 ):
4762 """VNFD does not have persistent storage."""
4763 vnfd = deepcopy(vnfd_wthout_persistent_storage)
4764 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4765 mock_select_persistent_root_disk.return_value = None
4766 persistent_root_disk = {}
4767 disk_list = []
4768 self.ns._add_persistent_root_disk_to_disk_list(
4769 vnfd, target_vdu, persistent_root_disk, disk_list
4770 )
4771 self.assertEqual(disk_list, [])
4772 self.assertEqual(mock_select_persistent_root_disk.call_count, 2)
4773 mock_volume_keeping_required.assert_not_called()
4774
4775 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk")
4776 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required")
4777 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk(
4778 self, mock_volume_keeping_required, mock_select_persistent_root_disk
4779 ):
4780 """Persistent_root_disk dict is empty."""
4781 vnfd = deepcopy(vnfd_wthout_persistent_storage)
4782 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4783 mock_select_persistent_root_disk.return_value = None
4784 persistent_root_disk = {}
4785 disk_list = []
4786 self.ns._add_persistent_root_disk_to_disk_list(
4787 vnfd, target_vdu, persistent_root_disk, disk_list
4788 )
4789 self.assertEqual(disk_list, [])
4790 self.assertEqual(mock_select_persistent_root_disk.call_count, 2)
4791 mock_volume_keeping_required.assert_not_called()
4792
4793 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self):
4794 """Invalid extra dict."""
4795 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4796 target_vdu["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id"
4797 extra_dict = {}
4798 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4799 with self.assertRaises(NsException) as err:
4800 self.ns._prepare_vdu_affinity_group_list(target_vdu, extra_dict, ns_preffix)
4801 self.assertEqual(str(err.exception), "Invalid extra_dict format.")
4802
4803 def test_prepare_vdu_affinity_group_list_one_affinity_group(self):
4804 """There is one affinity-group."""
4805 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4806 target_vdu["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"]
4807 extra_dict = {"depends_on": []}
4808 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4809 affinity_group_txt = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id"
4810 expected_result = [{"affinity_group_id": "TASK-" + affinity_group_txt}]
4811 expected_extra_dict = {"depends_on": [affinity_group_txt]}
4812 result = self.ns._prepare_vdu_affinity_group_list(
4813 target_vdu, extra_dict, ns_preffix
4814 )
4815 self.assertDictEqual(extra_dict, expected_extra_dict)
4816 self.assertEqual(result, expected_result)
4817
4818 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self):
4819 """There are two affinity-groups."""
4820 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4821 target_vdu["affinity-or-anti-affinity-group-id"] = [
4822 "affinity-group-id1",
4823 "affinity-group-id2",
4824 ]
4825 extra_dict = {"depends_on": []}
4826 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4827 affinity_group_txt1 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1"
4828 affinity_group_txt2 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2"
4829 expected_result = [
4830 {"affinity_group_id": "TASK-" + affinity_group_txt1},
4831 {"affinity_group_id": "TASK-" + affinity_group_txt2},
4832 ]
4833 expected_extra_dict = {"depends_on": [affinity_group_txt1, affinity_group_txt2]}
4834 result = self.ns._prepare_vdu_affinity_group_list(
4835 target_vdu, extra_dict, ns_preffix
4836 )
4837 self.assertDictEqual(extra_dict, expected_extra_dict)
4838 self.assertEqual(result, expected_result)
4839
4840 def test_prepare_vdu_affinity_group_list_no_affinity_group(self):
4841 """There is not any affinity-group."""
4842 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4843 extra_dict = {"depends_on": []}
4844 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3"
4845 result = self.ns._prepare_vdu_affinity_group_list(
4846 target_vdu, extra_dict, ns_preffix
4847 )
4848 self.assertDictEqual(extra_dict, {"depends_on": []})
4849 self.assertEqual(result, [])
4850
4851 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4852 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4853 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4854 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4855 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4856 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4857 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4858 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4859 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4860 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4861 def test_process_vdu_params_with_inst_vol_list(
4862 self,
4863 mock_prepare_vdu_affinity_group_list,
4864 mock_add_persistent_ordinary_disks_to_disk_list,
4865 mock_add_persistent_root_disk_to_disk_list,
4866 mock_find_persistent_volumes,
4867 mock_find_persistent_root_volumes,
4868 mock_prepare_vdu_ssh_keys,
4869 mock_prepare_vdu_cloud_init,
4870 mock_prepare_vdu_interfaces,
4871 mock_locate_vdu_interfaces,
4872 mock_sort_vdu_interfaces,
4873 ):
4874 """Instantiation volume list is empty."""
4875 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
4876
4877 target_vdu["interfaces"] = interfaces_wth_all_positions
4878
4879 vdu_instantiation_vol_list = [
4880 {
4881 "vim-volume-id": vim_volume_id,
4882 "name": "persistent-volume2",
4883 }
4884 ]
4885 target_vdu["additionalParams"] = {
4886 "OSM": {"vdu_volumes": vdu_instantiation_vol_list}
4887 }
4888 mock_prepare_vdu_cloud_init.return_value = {}
4889 mock_prepare_vdu_affinity_group_list.return_value = []
4890 persistent_root_disk = {
4891 "persistent-root-volume": {
4892 "image_id": "ubuntu20.04",
4893 "size": "10",
4894 }
4895 }
4896 mock_find_persistent_root_volumes.return_value = persistent_root_disk
4897
4898 new_kwargs = deepcopy(kwargs)
4899 new_kwargs.update(
4900 {
4901 "vnfr_id": vnfr_id,
4902 "nsr_id": nsr_id,
4903 "tasks_by_target_record_id": {},
4904 "logger": "logger",
4905 }
4906 )
4907 expected_extra_dict_copy = deepcopy(expected_extra_dict)
4908 vnfd = deepcopy(vnfd_wth_persistent_storage)
4909 db.get_one.return_value = vnfd
4910 result = Ns._process_vdu_params(
4911 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
4912 )
4913 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
4914 mock_locate_vdu_interfaces.assert_not_called()
4915 mock_prepare_vdu_cloud_init.assert_called_once()
4916 mock_add_persistent_root_disk_to_disk_list.assert_not_called()
4917 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
4918 mock_prepare_vdu_interfaces.assert_called_once_with(
4919 target_vdu,
4920 expected_extra_dict_copy,
4921 ns_preffix,
4922 vnf_preffix,
4923 "logger",
4924 {},
4925 [],
4926 )
4927 self.assertDictEqual(result, expected_extra_dict_copy)
4928 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
4929 mock_prepare_vdu_affinity_group_list.assert_called_once()
4930 mock_find_persistent_volumes.assert_called_once_with(
4931 persistent_root_disk, target_vdu, vdu_instantiation_vol_list, []
4932 )
4933
4934 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
4935 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
4936 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
4937 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
4938 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
4939 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
4940 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
4941 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
4942 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
4943 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
4944 def test_process_vdu_params_wth_affinity_groups(
4945 self,
4946 mock_prepare_vdu_affinity_group_list,
4947 mock_add_persistent_ordinary_disks_to_disk_list,
4948 mock_add_persistent_root_disk_to_disk_list,
4949 mock_find_persistent_volumes,
4950 mock_find_persistent_root_volumes,
4951 mock_prepare_vdu_ssh_keys,
4952 mock_prepare_vdu_cloud_init,
4953 mock_prepare_vdu_interfaces,
4954 mock_locate_vdu_interfaces,
4955 mock_sort_vdu_interfaces,
4956 ):
4957 """There is cloud-config."""
4958 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
4959
4960 self.maxDiff = None
4961 target_vdu["interfaces"] = interfaces_wth_all_positions
4962 mock_prepare_vdu_cloud_init.return_value = {}
4963 mock_prepare_vdu_affinity_group_list.return_value = [
4964 "affinity_group_1",
4965 "affinity_group_2",
4966 ]
4967
4968 new_kwargs = deepcopy(kwargs)
4969 new_kwargs.update(
4970 {
4971 "vnfr_id": vnfr_id,
4972 "nsr_id": nsr_id,
4973 "tasks_by_target_record_id": {},
4974 "logger": "logger",
4975 }
4976 )
4977 expected_extra_dict3 = deepcopy(expected_extra_dict2)
4978 expected_extra_dict3["params"]["affinity_group_list"] = [
4979 "affinity_group_1",
4980 "affinity_group_2",
4981 ]
4982 vnfd = deepcopy(vnfd_wth_persistent_storage)
4983 db.get_one.return_value = vnfd
4984 result = Ns._process_vdu_params(
4985 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
4986 )
4987 self.assertDictEqual(result, expected_extra_dict3)
4988 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
4989 mock_locate_vdu_interfaces.assert_not_called()
4990 mock_prepare_vdu_cloud_init.assert_called_once()
4991 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
4992 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
4993 mock_prepare_vdu_interfaces.assert_called_once_with(
4994 target_vdu,
4995 expected_extra_dict3,
4996 ns_preffix,
4997 vnf_preffix,
4998 "logger",
4999 {},
5000 [],
5001 )
5002
5003 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5004 mock_prepare_vdu_affinity_group_list.assert_called_once()
5005 mock_find_persistent_volumes.assert_not_called()
5006
5007 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5008 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5009 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5010 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5011 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5012 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5013 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5014 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5015 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5016 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5017 def test_process_vdu_params_wth_cloud_config(
5018 self,
5019 mock_prepare_vdu_affinity_group_list,
5020 mock_add_persistent_ordinary_disks_to_disk_list,
5021 mock_add_persistent_root_disk_to_disk_list,
5022 mock_find_persistent_volumes,
5023 mock_find_persistent_root_volumes,
5024 mock_prepare_vdu_ssh_keys,
5025 mock_prepare_vdu_cloud_init,
5026 mock_prepare_vdu_interfaces,
5027 mock_locate_vdu_interfaces,
5028 mock_sort_vdu_interfaces,
5029 ):
5030 """There is cloud-config."""
5031 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
5032
5033 self.maxDiff = None
5034 target_vdu["interfaces"] = interfaces_wth_all_positions
5035 mock_prepare_vdu_cloud_init.return_value = {
5036 "user-data": user_data,
5037 "boot-data-drive": "vda",
5038 }
5039 mock_prepare_vdu_affinity_group_list.return_value = []
5040
5041 new_kwargs = deepcopy(kwargs)
5042 new_kwargs.update(
5043 {
5044 "vnfr_id": vnfr_id,
5045 "nsr_id": nsr_id,
5046 "tasks_by_target_record_id": {},
5047 "logger": "logger",
5048 }
5049 )
5050 expected_extra_dict3 = deepcopy(expected_extra_dict2)
5051 expected_extra_dict3["params"]["cloud_config"] = {
5052 "user-data": user_data,
5053 "boot-data-drive": "vda",
5054 }
5055 vnfd = deepcopy(vnfd_wth_persistent_storage)
5056 db.get_one.return_value = vnfd
5057 result = Ns._process_vdu_params(
5058 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5059 )
5060 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
5061 mock_locate_vdu_interfaces.assert_not_called()
5062 mock_prepare_vdu_cloud_init.assert_called_once()
5063 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5064 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
5065 mock_prepare_vdu_interfaces.assert_called_once_with(
5066 target_vdu,
5067 expected_extra_dict3,
5068 ns_preffix,
5069 vnf_preffix,
5070 "logger",
5071 {},
5072 [],
5073 )
5074 self.assertDictEqual(result, expected_extra_dict3)
5075 mock_prepare_vdu_ssh_keys.assert_called_once_with(
5076 target_vdu, None, {"user-data": user_data, "boot-data-drive": "vda"}
5077 )
5078 mock_prepare_vdu_affinity_group_list.assert_called_once()
5079 mock_find_persistent_volumes.assert_not_called()
5080
5081 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5082 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5083 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5084 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5085 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5086 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5087 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5088 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5089 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5090 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5091 def test_process_vdu_params_wthout_persistent_storage(
5092 self,
5093 mock_prepare_vdu_affinity_group_list,
5094 mock_add_persistent_ordinary_disks_to_disk_list,
5095 mock_add_persistent_root_disk_to_disk_list,
5096 mock_find_persistent_volumes,
5097 mock_find_persistent_root_volumes,
5098 mock_prepare_vdu_ssh_keys,
5099 mock_prepare_vdu_cloud_init,
5100 mock_prepare_vdu_interfaces,
5101 mock_locate_vdu_interfaces,
5102 mock_sort_vdu_interfaces,
5103 ):
5104 """There is not any persistent storage."""
5105 target_vdu = deepcopy(target_vdu_wthout_persistent_storage)
5106
5107 self.maxDiff = None
5108 target_vdu["interfaces"] = interfaces_wth_all_positions
5109 mock_prepare_vdu_cloud_init.return_value = {}
5110 mock_prepare_vdu_affinity_group_list.return_value = []
5111
5112 new_kwargs = deepcopy(kwargs)
5113 new_kwargs.update(
5114 {
5115 "vnfr_id": vnfr_id,
5116 "nsr_id": nsr_id,
5117 "tasks_by_target_record_id": {},
5118 "logger": "logger",
5119 }
5120 )
5121 expected_extra_dict_copy = deepcopy(expected_extra_dict2)
5122 vnfd = deepcopy(vnfd_wthout_persistent_storage)
5123 db.get_one.return_value = vnfd
5124 result = Ns._process_vdu_params(
5125 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5126 )
5127 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu)
5128 mock_locate_vdu_interfaces.assert_not_called()
5129 mock_prepare_vdu_cloud_init.assert_called_once()
5130 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5131 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
5132 mock_prepare_vdu_interfaces.assert_called_once_with(
5133 target_vdu,
5134 expected_extra_dict_copy,
5135 ns_preffix,
5136 vnf_preffix,
5137 "logger",
5138 {},
5139 [],
5140 )
5141 self.assertDictEqual(result, expected_extra_dict_copy)
5142 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5143 mock_prepare_vdu_affinity_group_list.assert_called_once()
5144 mock_find_persistent_volumes.assert_not_called()
5145
5146 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5147 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5148 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5149 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5150 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5151 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5152 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5153 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5154 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5155 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5156 def test_process_vdu_params_interfaces_partially_located(
5157 self,
5158 mock_prepare_vdu_affinity_group_list,
5159 mock_add_persistent_ordinary_disks_to_disk_list,
5160 mock_add_persistent_root_disk_to_disk_list,
5161 mock_find_persistent_volumes,
5162 mock_find_persistent_root_volumes,
5163 mock_prepare_vdu_ssh_keys,
5164 mock_prepare_vdu_cloud_init,
5165 mock_prepare_vdu_interfaces,
5166 mock_locate_vdu_interfaces,
5167 mock_sort_vdu_interfaces,
5168 ):
5169 """Some interfaces have position."""
5170 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5171
5172 self.maxDiff = None
5173 target_vdu["interfaces"] = [
5174 {
5175 "name": "vdu-eth1",
5176 "ns-vld-id": "net1",
5177 },
5178 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2},
5179 {
5180 "name": "vdu-eth3",
5181 "ns-vld-id": "mgmtnet",
5182 },
5183 ]
5184 mock_prepare_vdu_cloud_init.return_value = {}
5185 mock_prepare_vdu_affinity_group_list.return_value = []
5186 persistent_root_disk = {
5187 "persistent-root-volume": {
5188 "image_id": "ubuntu20.04",
5189 "size": "10",
5190 "keep": True,
5191 }
5192 }
5193 mock_find_persistent_root_volumes.return_value = persistent_root_disk
5194
5195 new_kwargs = deepcopy(kwargs)
5196 new_kwargs.update(
5197 {
5198 "vnfr_id": vnfr_id,
5199 "nsr_id": nsr_id,
5200 "tasks_by_target_record_id": {},
5201 "logger": "logger",
5202 }
5203 )
5204
5205 vnfd = deepcopy(vnfd_wth_persistent_storage)
5206 db.get_one.return_value = vnfd
5207 result = Ns._process_vdu_params(
5208 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5209 )
5210 expected_extra_dict_copy = deepcopy(expected_extra_dict)
5211 mock_sort_vdu_interfaces.assert_not_called()
5212 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5213 mock_prepare_vdu_cloud_init.assert_called_once()
5214 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5215 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
5216 mock_prepare_vdu_interfaces.assert_called_once_with(
5217 target_vdu,
5218 expected_extra_dict_copy,
5219 ns_preffix,
5220 vnf_preffix,
5221 "logger",
5222 {},
5223 [],
5224 )
5225 self.assertDictEqual(result, expected_extra_dict_copy)
5226 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5227 mock_prepare_vdu_affinity_group_list.assert_called_once()
5228 mock_find_persistent_volumes.assert_not_called()
5229 mock_find_persistent_root_volumes.assert_not_called()
5230
5231 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5232 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5233 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5234 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5235 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5236 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5237 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5238 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5239 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5240 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5241 def test_process_vdu_params_no_interface_position(
5242 self,
5243 mock_prepare_vdu_affinity_group_list,
5244 mock_add_persistent_ordinary_disks_to_disk_list,
5245 mock_add_persistent_root_disk_to_disk_list,
5246 mock_find_persistent_volumes,
5247 mock_find_persistent_root_volumes,
5248 mock_prepare_vdu_ssh_keys,
5249 mock_prepare_vdu_cloud_init,
5250 mock_prepare_vdu_interfaces,
5251 mock_locate_vdu_interfaces,
5252 mock_sort_vdu_interfaces,
5253 ):
5254 """Interfaces do not have position."""
5255 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5256
5257 self.maxDiff = None
5258 target_vdu["interfaces"] = interfaces_wthout_positions
5259 mock_prepare_vdu_cloud_init.return_value = {}
5260 mock_prepare_vdu_affinity_group_list.return_value = []
5261 persistent_root_disk = {
5262 "persistent-root-volume": {
5263 "image_id": "ubuntu20.04",
5264 "size": "10",
5265 "keep": True,
5266 }
5267 }
5268 mock_find_persistent_root_volumes.return_value = persistent_root_disk
5269 new_kwargs = deepcopy(kwargs)
5270 new_kwargs.update(
5271 {
5272 "vnfr_id": vnfr_id,
5273 "nsr_id": nsr_id,
5274 "tasks_by_target_record_id": {},
5275 "logger": "logger",
5276 }
5277 )
5278
5279 vnfd = deepcopy(vnfd_wth_persistent_storage)
5280 db.get_one.return_value = vnfd
5281 result = Ns._process_vdu_params(
5282 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5283 )
5284 expected_extra_dict_copy = deepcopy(expected_extra_dict)
5285 mock_sort_vdu_interfaces.assert_not_called()
5286 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5287 mock_prepare_vdu_cloud_init.assert_called_once()
5288 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5289 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once()
5290 mock_prepare_vdu_interfaces.assert_called_once_with(
5291 target_vdu,
5292 expected_extra_dict_copy,
5293 ns_preffix,
5294 vnf_preffix,
5295 "logger",
5296 {},
5297 [],
5298 )
5299 self.assertDictEqual(result, expected_extra_dict_copy)
5300 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5301 mock_prepare_vdu_affinity_group_list.assert_called_once()
5302 mock_find_persistent_volumes.assert_not_called()
5303 mock_find_persistent_root_volumes.assert_not_called()
5304
5305 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5306 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5307 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5308 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5309 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5310 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5311 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5312 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5313 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5314 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5315 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception(
5316 self,
5317 mock_prepare_vdu_affinity_group_list,
5318 mock_add_persistent_ordinary_disks_to_disk_list,
5319 mock_add_persistent_root_disk_to_disk_list,
5320 mock_find_persistent_volumes,
5321 mock_find_persistent_root_volumes,
5322 mock_prepare_vdu_ssh_keys,
5323 mock_prepare_vdu_cloud_init,
5324 mock_prepare_vdu_interfaces,
5325 mock_locate_vdu_interfaces,
5326 mock_sort_vdu_interfaces,
5327 ):
5328 """Prepare vdu interfaces method raises exception."""
5329 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5330
5331 self.maxDiff = None
5332 target_vdu["interfaces"] = interfaces_wthout_positions
5333 mock_prepare_vdu_cloud_init.return_value = {}
5334 mock_prepare_vdu_affinity_group_list.return_value = []
5335 persistent_root_disk = {
5336 "persistent-root-volume": {
5337 "image_id": "ubuntu20.04",
5338 "size": "10",
5339 "keep": True,
5340 }
5341 }
5342 mock_find_persistent_root_volumes.return_value = persistent_root_disk
5343 new_kwargs = deepcopy(kwargs)
5344 new_kwargs.update(
5345 {
5346 "vnfr_id": vnfr_id,
5347 "nsr_id": nsr_id,
5348 "tasks_by_target_record_id": {},
5349 "logger": "logger",
5350 }
5351 )
5352 mock_prepare_vdu_interfaces.side_effect = TypeError
5353
5354 vnfd = deepcopy(vnfd_wth_persistent_storage)
5355 db.get_one.return_value = vnfd
5356 with self.assertRaises(Exception) as err:
5357 Ns._process_vdu_params(
5358 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5359 )
5360 self.assertEqual(type(err), TypeError)
5361 mock_sort_vdu_interfaces.assert_not_called()
5362 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5363 mock_prepare_vdu_cloud_init.assert_not_called()
5364 mock_add_persistent_root_disk_to_disk_list.assert_not_called()
5365 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
5366 mock_prepare_vdu_interfaces.assert_called_once()
5367 mock_prepare_vdu_ssh_keys.assert_not_called()
5368 mock_prepare_vdu_affinity_group_list.assert_not_called()
5369 mock_find_persistent_volumes.assert_not_called()
5370 mock_find_persistent_root_volumes.assert_not_called()
5371
5372 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces")
5373 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces")
5374 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces")
5375 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init")
5376 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys")
5377 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes")
5378 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes")
5379 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list")
5380 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list")
5381 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list")
5382 def test_process_vdu_params_add_persistent_root_disk_raises_exception(
5383 self,
5384 mock_prepare_vdu_affinity_group_list,
5385 mock_add_persistent_ordinary_disks_to_disk_list,
5386 mock_add_persistent_root_disk_to_disk_list,
5387 mock_find_persistent_volumes,
5388 mock_find_persistent_root_volumes,
5389 mock_prepare_vdu_ssh_keys,
5390 mock_prepare_vdu_cloud_init,
5391 mock_prepare_vdu_interfaces,
5392 mock_locate_vdu_interfaces,
5393 mock_sort_vdu_interfaces,
5394 ):
5395 """Add persistent root disk method raises exception."""
5396 target_vdu = deepcopy(target_vdu_wth_persistent_storage)
5397
5398 self.maxDiff = None
5399 target_vdu["interfaces"] = interfaces_wthout_positions
5400 mock_prepare_vdu_cloud_init.return_value = {}
5401 mock_prepare_vdu_affinity_group_list.return_value = []
5402 mock_add_persistent_root_disk_to_disk_list.side_effect = KeyError
5403 new_kwargs = deepcopy(kwargs)
5404 new_kwargs.update(
5405 {
5406 "vnfr_id": vnfr_id,
5407 "nsr_id": nsr_id,
5408 "tasks_by_target_record_id": {},
5409 "logger": "logger",
5410 }
5411 )
5412
5413 vnfd = deepcopy(vnfd_wth_persistent_storage)
5414 db.get_one.return_value = vnfd
5415 with self.assertRaises(Exception) as err:
5416 Ns._process_vdu_params(
5417 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs
5418 )
5419 self.assertEqual(type(err), KeyError)
5420 mock_sort_vdu_interfaces.assert_not_called()
5421 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu)
5422 mock_prepare_vdu_cloud_init.assert_called_once()
5423 mock_add_persistent_root_disk_to_disk_list.assert_called_once()
5424 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called()
5425 mock_prepare_vdu_interfaces.assert_called_once_with(
5426 target_vdu,
5427 {
5428 "depends_on": [
5429 f"{ns_preffix}:image.0",
5430 f"{ns_preffix}:flavor.0",
5431 ]
5432 },
5433 ns_preffix,
5434 vnf_preffix,
5435 "logger",
5436 {},
5437 [],
5438 )
5439
5440 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {})
5441 mock_prepare_vdu_affinity_group_list.assert_not_called()
5442 mock_find_persistent_volumes.assert_not_called()
5443 mock_find_persistent_root_volumes.assert_not_called()
5444
5445 def test_select_persistent_root_disk(self):
5446 vdu = deepcopy(target_vdu_wth_persistent_storage)
5447 vdu["virtual-storage-desc"] = [
5448 "persistent-root-volume",
5449 "persistent-volume2",
5450 "ephemeral-volume",
5451 ]
5452 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5453 expected_result = vsd
5454 result = Ns._select_persistent_root_disk(vsd, vdu)
5455 self.assertEqual(result, expected_result)
5456
5457 def test_select_persistent_root_disk_first_vsd_is_different(self):
5458 """VDU first virtual-storage-desc is different than vsd id."""
5459 vdu = deepcopy(target_vdu_wth_persistent_storage)
5460 vdu["virtual-storage-desc"] = [
5461 "persistent-volume2",
5462 "persistent-root-volume",
5463 "ephemeral-volume",
5464 ]
5465 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5466 expected_result = None
5467 result = Ns._select_persistent_root_disk(vsd, vdu)
5468 self.assertEqual(result, expected_result)
5469
5470 def test_select_persistent_root_disk_vsd_is_not_persistent(self):
5471 """vsd type is not persistent."""
5472 vdu = deepcopy(target_vdu_wth_persistent_storage)
5473 vdu["virtual-storage-desc"] = [
5474 "persistent-volume2",
5475 "persistent-root-volume",
5476 "ephemeral-volume",
5477 ]
5478 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5479 vsd["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage"
5480 expected_result = None
5481 result = Ns._select_persistent_root_disk(vsd, vdu)
5482 self.assertEqual(result, expected_result)
5483
5484 def test_select_persistent_root_disk_vsd_does_not_have_size(self):
5485 """vsd size is None."""
5486 vdu = deepcopy(target_vdu_wth_persistent_storage)
5487 vdu["virtual-storage-desc"] = [
5488 "persistent-volume2",
5489 "persistent-root-volume",
5490 "ephemeral-volume",
5491 ]
5492 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5493 vsd["size-of-storage"] = None
5494 expected_result = None
5495 result = Ns._select_persistent_root_disk(vsd, vdu)
5496 self.assertEqual(result, expected_result)
5497
5498 def test_select_persistent_root_disk_vdu_wthout_vsd(self):
5499 """VDU does not have virtual-storage-desc."""
5500 vdu = deepcopy(target_vdu_wth_persistent_storage)
5501 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1]
5502 expected_result = None
5503 result = Ns._select_persistent_root_disk(vsd, vdu)
5504 self.assertEqual(result, expected_result)
5505
5506 def test_select_persistent_root_disk_invalid_vsd_type(self):
5507 """vsd is list, expected to be a dict."""
5508 vdu = deepcopy(target_vdu_wth_persistent_storage)
5509 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"]
5510 with self.assertRaises(AttributeError):
5511 Ns._select_persistent_root_disk(vsd, vdu)