Coverage for NG-RO/osm_ng_ro/tests/test_ns.py: 99%

1843 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-06-30 09:51 +0000

1####################################################################################### 

2# Copyright ETSI Contributors and Others. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16####################################################################################### 

17from copy import deepcopy 

18import unittest 

19from unittest.mock import MagicMock, Mock, patch 

20 

21from jinja2 import ( 

22 Environment, 

23 select_autoescape, 

24 StrictUndefined, 

25 TemplateError, 

26 TemplateNotFound, 

27 UndefinedError, 

28) 

29from osm_ng_ro.ns import Ns, NsException 

30 

31 

32__author__ = "Eduardo Sousa" 

33__date__ = "$19-NOV-2021 00:00:00$" 

34 

35 

36# Variables used in Tests 

37vnfd_wth_persistent_storage = { 

38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

39 "df": [ 

40 { 

41 "id": "default-df", 

42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}], 

43 } 

44 ], 

45 "id": "several_volumes-vnf", 

46 "product-name": "several_volumes-vnf", 

47 "vdu": [ 

48 { 

49 "id": "several_volumes-VM", 

50 "name": "several_volumes-VM", 

51 "sw-image-desc": "ubuntu20.04", 

52 "alternative-sw-image-desc": [ 

53 "ubuntu20.04-aws", 

54 "ubuntu20.04-azure", 

55 ], 

56 "virtual-storage-desc": [ 

57 "persistent-root-volume", 

58 "persistent-volume2", 

59 "ephemeral-volume", 

60 ], 

61 } 

62 ], 

63 "version": "1.0", 

64 "virtual-storage-desc": [ 

65 { 

66 "id": "persistent-volume2", 

67 "type-of-storage": "persistent-storage:persistent-storage", 

68 "size-of-storage": "10", 

69 }, 

70 { 

71 "id": "persistent-root-volume", 

72 "type-of-storage": "persistent-storage:persistent-storage", 

73 "size-of-storage": "10", 

74 "vdu-storage-requirements": [ 

75 {"key": "keep-volume", "value": "true"}, 

76 ], 

77 }, 

78 { 

79 "id": "ephemeral-volume", 

80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

81 "size-of-storage": "1", 

82 }, 

83 ], 

84 "_admin": { 

85 "storage": { 

86 "fs": "mongo", 

87 "path": "/app/storage/", 

88 }, 

89 "type": "vnfd", 

90 }, 

91} 

92vim_volume_id = "ru937f49-3870-4169-b758-9732e1ff40f3" 

93task_by_target_record_id = { 

94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": { 

95 "extra_dict": {"params": {"net_type": "SR-IOV"}} 

96 } 

97} 

98interfaces_wthout_positions = [ 

99 { 

100 "name": "vdu-eth1", 

101 "ns-vld-id": "net1", 

102 }, 

103 { 

104 "name": "vdu-eth2", 

105 "ns-vld-id": "net2", 

106 }, 

107 { 

108 "name": "vdu-eth3", 

109 "ns-vld-id": "mgmtnet", 

110 }, 

111] 

112interfaces_wth_all_positions = [ 

113 { 

114 "name": "vdu-eth1", 

115 "ns-vld-id": "net1", 

116 "position": 2, 

117 }, 

118 { 

119 "name": "vdu-eth2", 

120 "ns-vld-id": "net2", 

121 "position": 0, 

122 }, 

123 { 

124 "name": "vdu-eth3", 

125 "ns-vld-id": "mgmtnet", 

126 "position": 1, 

127 }, 

128] 

129target_vdu_wth_persistent_storage = { 

130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960", 

131 "ns-flavor-id": "0", 

132 "ns-image-id": "0", 

133 "vdu-name": "several_volumes-VM", 

134 "interfaces": [ 

135 { 

136 "name": "vdu-eth0", 

137 "ns-vld-id": "mgmtnet", 

138 } 

139 ], 

140 "virtual-storages": [ 

141 { 

142 "id": "persistent-volume2", 

143 "size-of-storage": "10", 

144 "type-of-storage": "persistent-storage:persistent-storage", 

145 }, 

146 { 

147 "id": "persistent-root-volume", 

148 "size-of-storage": "10", 

149 "type-of-storage": "persistent-storage:persistent-storage", 

150 "vdu-storage-requirements": [ 

151 {"key": "keep-volume", "value": "true"}, 

152 ], 

153 }, 

154 { 

155 "id": "ephemeral-volume", 

156 "size-of-storage": "1", 

157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

158 }, 

159 ], 

160} 

161db = MagicMock(name="database mock") 

162fs = MagicMock(name="database mock") 

163ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

164vnf_preffix = "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5" 

165vnfr_id = "wh47f48-y870-4169-b758-5732e1ff40f5" 

166nsr_id = "th47f48-9870-4169-b758-9732e1ff40f3" 

167indata = { 

168 "name": "sample_name", 

169} 

170expected_extra_dict = { 

171 "depends_on": [ 

172 f"{ns_preffix}:image.0", 

173 f"{ns_preffix}:flavor.0", 

174 ], 

175 "params": { 

176 "affinity_group_list": [], 

177 "availability_zone_index": None, 

178 "availability_zone_list": None, 

179 "cloud_config": None, 

180 "description": "several_volumes-VM", 

181 "disk_list": [], 

182 "flavor_id": f"TASK-{ns_preffix}:flavor.0", 

183 "image_id": f"TASK-{ns_preffix}:image.0", 

184 "name": "sample_name-vnf-several-volumes-several_volumes-VM-0", 

185 "net_list": [], 

186 "start": True, 

187 }, 

188} 

189 

190expected_extra_dict2 = { 

191 "depends_on": [ 

192 f"{ns_preffix}:image.0", 

193 f"{ns_preffix}:flavor.0", 

194 ], 

195 "params": { 

196 "affinity_group_list": [], 

197 "availability_zone_index": None, 

198 "availability_zone_list": None, 

199 "cloud_config": None, 

200 "description": "without_volumes-VM", 

201 "disk_list": [], 

202 "flavor_id": f"TASK-{ns_preffix}:flavor.0", 

203 "image_id": f"TASK-{ns_preffix}:image.0", 

204 "name": "sample_name-vnf-several-volumes-without_volumes-VM-0", 

205 "net_list": [], 

206 "start": True, 

207 }, 

208} 

209 

210tasks_by_target_record_id = { 

211 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": { 

212 "extra_dict": { 

213 "params": { 

214 "net_type": "SR-IOV", 

215 } 

216 } 

217 } 

218} 

219kwargs = { 

220 "db": MagicMock(), 

221 "vdu2cloud_init": {}, 

222 "vnfr": { 

223 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

224 "member-vnf-index-ref": "vnf-several-volumes", 

225 }, 

226} 

227vnfd_wthout_persistent_storage = { 

228 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

229 "df": [ 

230 { 

231 "id": "default-df", 

232 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}], 

233 } 

234 ], 

235 "id": "without_volumes-vnf", 

236 "product-name": "without_volumes-vnf", 

237 "vdu": [ 

238 { 

239 "id": "without_volumes-VM", 

240 "name": "without_volumes-VM", 

241 "sw-image-desc": "ubuntu20.04", 

242 "alternative-sw-image-desc": [ 

243 "ubuntu20.04-aws", 

244 "ubuntu20.04-azure", 

245 ], 

246 "virtual-storage-desc": ["root-volume", "ephemeral-volume"], 

247 } 

248 ], 

249 "version": "1.0", 

250 "virtual-storage-desc": [ 

251 {"id": "root-volume", "size-of-storage": "10"}, 

252 { 

253 "id": "ephemeral-volume", 

254 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

255 "size-of-storage": "1", 

256 }, 

257 ], 

258 "_admin": { 

259 "storage": { 

260 "fs": "mongo", 

261 "path": "/app/storage/", 

262 }, 

263 "type": "vnfd", 

264 }, 

265} 

266 

267target_vdu_wthout_persistent_storage = { 

268 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960", 

269 "ns-flavor-id": "0", 

270 "ns-image-id": "0", 

271 "vdu-name": "without_volumes-VM", 

272 "interfaces": [ 

273 { 

274 "name": "vdu-eth0", 

275 "ns-vld-id": "mgmtnet", 

276 } 

277 ], 

278 "virtual-storages": [ 

279 { 

280 "id": "root-volume", 

281 "size-of-storage": "10", 

282 }, 

283 { 

284 "id": "ephemeral-volume", 

285 "size-of-storage": "1", 

286 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

287 }, 

288 ], 

289} 

290cloud_init_content = """ 

291disk_setup: 

292 ephemeral0: 

293 table_type: {{type}} 

294 layout: True 

295 overwrite: {{is_override}} 

296runcmd: 

297 - [ ls, -l, / ] 

298 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

299""" 

300 

301user_data = """ 

302disk_setup: 

303 ephemeral0: 

304 table_type: mbr 

305 layout: True 

306 overwrite: False 

307runcmd: 

308 - [ ls, -l, / ] 

309 - [ sh, -xc, "echo $(date) '& rm -rf /'" ] 

310""" 

311vdu_id = "bb9c43f9-10a2-4569-a8a8-957c3528b6d1" 

312vnf_id = "665b4165-ce24-4320-bf19-b9a45bade49f" 

313target_vim = "vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

314action_id = "bb937f49-3870-4169-b758-9732e1ff40f3" 

315nsr_id_2 = "993166fe-723e-4680-ac4b-b1af2541ae31" 

316target_record_1 = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

317target_record_id = ( 

318 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:" 

319 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1" 

320) 

321expected_result_vertical_scale = { 

322 "target_id": target_vim, 

323 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

324 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

325 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1", 

326 "status": "SCHEDULED", 

327 "action": "EXEC", 

328 "item": "verticalscale", 

329 "target_record": target_record_1, 

330 "target_record_id": target_record_id, 

331 "params": { 

332 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

333 "flavor_dict": "flavor_dict", 

334 }, 

335} 

336vdu = { 

337 "id": vdu_id, 

338 "vim_info": {target_vim: {"interfaces": []}}, 

339} 

340vnf = {"_id": vnf_id} 

341extra_dict_vertical_scale = { 

342 "params": { 

343 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

344 "flavor_dict": "flavor_dict", 

345 }, 

346} 

347extra_dict_migrate = { 

348 "params": { 

349 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

350 "migrate_host": "migrateToHost", 

351 }, 

352} 

353expected_result_migrate = { 

354 "target_id": target_vim, 

355 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

356 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

357 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1", 

358 "status": "SCHEDULED", 

359 "action": "EXEC", 

360 "item": "migrate", 

361 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35", 

362 "target_record_id": target_record_id, 

363 "params": { 

364 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

365 "migrate_host": "migrateToHost", 

366 }, 

367} 

368expected_result_rebuild_start_stop = { 

369 "target_id": target_vim, 

370 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

371 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

372 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0", 

373 "status": "SCHEDULED", 

374 "action": "EXEC", 

375 "item": "update", 

376 "target_record_id": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1", 

377} 

378 

379 

380class TestException(Exception): 

381 pass 

382 

383 

384class CopyingMock(MagicMock): 

385 def __call__(self, *args, **kwargs): 

386 args = deepcopy(args) 

387 kwargs = deepcopy(kwargs) 

388 return super(CopyingMock, self).__call__(*args, **kwargs) 

389 

390 

391class TestNs(unittest.TestCase): 

392 def setUp(self): 

393 pass 

394 

395 def test__create_task_without_extra_dict(self): 

396 expected_result = { 

397 "target_id": "vim_openstack_1", 

398 "action_id": "123456", 

399 "nsr_id": "654321", 

400 "task_id": "123456:1", 

401 "status": "SCHEDULED", 

402 "action": "CREATE", 

403 "item": "test_item", 

404 "target_record": "test_target_record", 

405 "target_record_id": "test_target_record_id", 

406 } 

407 deployment_info = { 

408 "action_id": "123456", 

409 "nsr_id": "654321", 

410 "task_index": 1, 

411 } 

412 

413 task = Ns._create_task( 

414 deployment_info=deployment_info, 

415 target_id="vim_openstack_1", 

416 item="test_item", 

417 action="CREATE", 

418 target_record="test_target_record", 

419 target_record_id="test_target_record_id", 

420 ) 

421 

422 self.assertEqual(deployment_info.get("task_index"), 2) 

423 self.assertDictEqual(task, expected_result) 

424 

425 def test__create_task(self): 

426 expected_result = { 

427 "target_id": "vim_openstack_1", 

428 "action_id": "123456", 

429 "nsr_id": "654321", 

430 "task_id": "123456:1", 

431 "status": "SCHEDULED", 

432 "action": "CREATE", 

433 "item": "test_item", 

434 "target_record": "test_target_record", 

435 "target_record_id": "test_target_record_id", 

436 # values coming from extra_dict 

437 "params": "test_params", 

438 "find_params": "test_find_params", 

439 "depends_on": "test_depends_on", 

440 } 

441 deployment_info = { 

442 "action_id": "123456", 

443 "nsr_id": "654321", 

444 "task_index": 1, 

445 } 

446 

447 task = Ns._create_task( 

448 deployment_info=deployment_info, 

449 target_id="vim_openstack_1", 

450 item="test_item", 

451 action="CREATE", 

452 target_record="test_target_record", 

453 target_record_id="test_target_record_id", 

454 extra_dict={ 

455 "params": "test_params", 

456 "find_params": "test_find_params", 

457 "depends_on": "test_depends_on", 

458 }, 

459 ) 

460 

461 self.assertEqual(deployment_info.get("task_index"), 2) 

462 self.assertDictEqual(task, expected_result) 

463 

464 @patch("osm_ng_ro.ns.time") 

465 def test__create_ro_task(self, mock_time: Mock): 

466 now = 1637324838.994551 

467 mock_time.return_value = now 

468 task = { 

469 "target_id": "vim_openstack_1", 

470 "action_id": "123456", 

471 "nsr_id": "654321", 

472 "task_id": "123456:1", 

473 "status": "SCHEDULED", 

474 "action": "CREATE", 

475 "item": "test_item", 

476 "target_record": "test_target_record", 

477 "target_record_id": "test_target_record_id", 

478 # values coming from extra_dict 

479 "params": "test_params", 

480 "find_params": "test_find_params", 

481 "depends_on": "test_depends_on", 

482 } 

483 expected_result = { 

484 "_id": "123456:1", 

485 "locked_by": None, 

486 "locked_at": 0.0, 

487 "target_id": "vim_openstack_1", 

488 "vim_info": { 

489 "created": False, 

490 "created_items": None, 

491 "vim_id": None, 

492 "vim_name": None, 

493 "vim_status": None, 

494 "vim_details": None, 

495 "vim_message": None, 

496 "refresh_at": None, 

497 }, 

498 "modified_at": now, 

499 "created_at": now, 

500 "to_check_at": now, 

501 "tasks": [task], 

502 } 

503 

504 ro_task = Ns._create_ro_task( 

505 target_id="vim_openstack_1", 

506 task=task, 

507 ) 

508 

509 self.assertDictEqual(ro_task, expected_result) 

510 

511 def test__process_image_params_with_empty_target_image(self): 

512 expected_result = { 

513 "find_params": {}, 

514 } 

515 target_image = {} 

516 

517 result = Ns._process_image_params( 

518 target_image=target_image, 

519 indata=None, 

520 vim_info=None, 

521 target_record_id=None, 

522 ) 

523 

524 self.assertDictEqual(expected_result, result) 

525 

526 def test__process_image_params_with_wrong_target_image(self): 

527 expected_result = { 

528 "find_params": {}, 

529 } 

530 target_image = { 

531 "no_image": "to_see_here", 

532 } 

533 

534 result = Ns._process_image_params( 

535 target_image=target_image, 

536 indata=None, 

537 vim_info=None, 

538 target_record_id=None, 

539 ) 

540 

541 self.assertDictEqual(expected_result, result) 

542 

543 def test__process_image_params_with_image(self): 

544 expected_result = { 

545 "find_params": { 

546 "filter_dict": { 

547 "name": "cirros", 

548 }, 

549 }, 

550 } 

551 target_image = { 

552 "image": "cirros", 

553 } 

554 

555 result = Ns._process_image_params( 

556 target_image=target_image, 

557 indata=None, 

558 vim_info=None, 

559 target_record_id=None, 

560 ) 

561 

562 self.assertDictEqual(expected_result, result) 

563 

564 def test__process_image_params_with_vim_image_id(self): 

565 expected_result = { 

566 "find_params": { 

567 "filter_dict": { 

568 "id": "123456", 

569 }, 

570 }, 

571 } 

572 target_image = { 

573 "vim_image_id": "123456", 

574 } 

575 

576 result = Ns._process_image_params( 

577 target_image=target_image, 

578 indata=None, 

579 vim_info=None, 

580 target_record_id=None, 

581 ) 

582 

583 self.assertDictEqual(expected_result, result) 

584 

585 def test__process_image_params_with_image_checksum(self): 

586 expected_result = { 

587 "find_params": { 

588 "filter_dict": { 

589 "checksum": "e3fc50a88d0a364313df4b21ef20c29e", 

590 }, 

591 }, 

592 } 

593 target_image = { 

594 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e", 

595 } 

596 

597 result = Ns._process_image_params( 

598 target_image=target_image, 

599 indata=None, 

600 vim_info=None, 

601 target_record_id=None, 

602 ) 

603 

604 self.assertDictEqual(expected_result, result) 

605 

606 def test__get_resource_allocation_params_with_empty_target_image(self): 

607 expected_result = {} 

608 quota_descriptor = {} 

609 

610 result = Ns._get_resource_allocation_params( 

611 quota_descriptor=quota_descriptor, 

612 ) 

613 

614 self.assertDictEqual(expected_result, result) 

615 

616 def test__get_resource_allocation_params_with_wrong_target_image(self): 

617 expected_result = {} 

618 quota_descriptor = { 

619 "no_quota": "present_here", 

620 } 

621 

622 result = Ns._get_resource_allocation_params( 

623 quota_descriptor=quota_descriptor, 

624 ) 

625 

626 self.assertDictEqual(expected_result, result) 

627 

628 def test__get_resource_allocation_params_with_limit(self): 

629 expected_result = { 

630 "limit": 10, 

631 } 

632 quota_descriptor = { 

633 "limit": "10", 

634 } 

635 

636 result = Ns._get_resource_allocation_params( 

637 quota_descriptor=quota_descriptor, 

638 ) 

639 

640 self.assertDictEqual(expected_result, result) 

641 

642 def test__get_resource_allocation_params_with_reserve(self): 

643 expected_result = { 

644 "reserve": 20, 

645 } 

646 quota_descriptor = { 

647 "reserve": "20", 

648 } 

649 

650 result = Ns._get_resource_allocation_params( 

651 quota_descriptor=quota_descriptor, 

652 ) 

653 

654 self.assertDictEqual(expected_result, result) 

655 

656 def test__get_resource_allocation_params_with_shares(self): 

657 expected_result = { 

658 "shares": 30, 

659 } 

660 quota_descriptor = { 

661 "shares": "30", 

662 } 

663 

664 result = Ns._get_resource_allocation_params( 

665 quota_descriptor=quota_descriptor, 

666 ) 

667 

668 self.assertDictEqual(expected_result, result) 

669 

670 def test__get_resource_allocation_params(self): 

671 expected_result = { 

672 "limit": 10, 

673 "reserve": 20, 

674 "shares": 30, 

675 } 

676 quota_descriptor = { 

677 "limit": "10", 

678 "reserve": "20", 

679 "shares": "30", 

680 } 

681 

682 result = Ns._get_resource_allocation_params( 

683 quota_descriptor=quota_descriptor, 

684 ) 

685 

686 self.assertDictEqual(expected_result, result) 

687 

688 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

689 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu( 

690 self, 

691 resource_allocation, 

692 ): 

693 expected_result = {} 

694 guest_epa_quota = {} 

695 epa_vcpu_set = True 

696 

697 result = Ns._process_guest_epa_quota_params( 

698 guest_epa_quota=guest_epa_quota, 

699 epa_vcpu_set=epa_vcpu_set, 

700 ) 

701 

702 self.assertDictEqual(expected_result, result) 

703 self.assertFalse(resource_allocation.called) 

704 

705 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

706 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu( 

707 self, 

708 resource_allocation, 

709 ): 

710 expected_result = {} 

711 guest_epa_quota = {} 

712 epa_vcpu_set = False 

713 

714 result = Ns._process_guest_epa_quota_params( 

715 guest_epa_quota=guest_epa_quota, 

716 epa_vcpu_set=epa_vcpu_set, 

717 ) 

718 

719 self.assertDictEqual(expected_result, result) 

720 self.assertFalse(resource_allocation.called) 

721 

722 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

723 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu( 

724 self, 

725 resource_allocation, 

726 ): 

727 expected_result = {} 

728 guest_epa_quota = { 

729 "no-quota": "nothing", 

730 } 

731 epa_vcpu_set = True 

732 

733 result = Ns._process_guest_epa_quota_params( 

734 guest_epa_quota=guest_epa_quota, 

735 epa_vcpu_set=epa_vcpu_set, 

736 ) 

737 

738 self.assertDictEqual(expected_result, result) 

739 self.assertFalse(resource_allocation.called) 

740 

741 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

742 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu( 

743 self, 

744 resource_allocation, 

745 ): 

746 expected_result = {} 

747 guest_epa_quota = { 

748 "no-quota": "nothing", 

749 } 

750 epa_vcpu_set = False 

751 

752 result = Ns._process_guest_epa_quota_params( 

753 guest_epa_quota=guest_epa_quota, 

754 epa_vcpu_set=epa_vcpu_set, 

755 ) 

756 

757 self.assertDictEqual(expected_result, result) 

758 self.assertFalse(resource_allocation.called) 

759 

760 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

761 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu( 

762 self, 

763 resource_allocation, 

764 ): 

765 expected_result = {} 

766 guest_epa_quota = { 

767 "cpu-quota": { 

768 "limit": "10", 

769 "reserve": "20", 

770 "shares": "30", 

771 }, 

772 } 

773 epa_vcpu_set = True 

774 

775 result = Ns._process_guest_epa_quota_params( 

776 guest_epa_quota=guest_epa_quota, 

777 epa_vcpu_set=epa_vcpu_set, 

778 ) 

779 

780 self.assertDictEqual(expected_result, result) 

781 self.assertFalse(resource_allocation.called) 

782 

783 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

784 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu( 

785 self, 

786 resource_allocation, 

787 ): 

788 expected_result = { 

789 "cpu-quota": { 

790 "limit": 10, 

791 "reserve": 20, 

792 "shares": 30, 

793 }, 

794 } 

795 guest_epa_quota = { 

796 "cpu-quota": { 

797 "limit": "10", 

798 "reserve": "20", 

799 "shares": "30", 

800 }, 

801 } 

802 epa_vcpu_set = False 

803 

804 resource_allocation_param = { 

805 "limit": "10", 

806 "reserve": "20", 

807 "shares": "30", 

808 } 

809 resource_allocation.return_value = { 

810 "limit": 10, 

811 "reserve": 20, 

812 "shares": 30, 

813 } 

814 

815 result = Ns._process_guest_epa_quota_params( 

816 guest_epa_quota=guest_epa_quota, 

817 epa_vcpu_set=epa_vcpu_set, 

818 ) 

819 

820 resource_allocation.assert_called_once_with(resource_allocation_param) 

821 self.assertDictEqual(expected_result, result) 

822 

823 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

824 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu( 

825 self, 

826 resource_allocation, 

827 ): 

828 expected_result = { 

829 "mem-quota": { 

830 "limit": 10, 

831 "reserve": 20, 

832 "shares": 30, 

833 }, 

834 } 

835 guest_epa_quota = { 

836 "mem-quota": { 

837 "limit": "10", 

838 "reserve": "20", 

839 "shares": "30", 

840 }, 

841 } 

842 epa_vcpu_set = True 

843 

844 resource_allocation_param = { 

845 "limit": "10", 

846 "reserve": "20", 

847 "shares": "30", 

848 } 

849 resource_allocation.return_value = { 

850 "limit": 10, 

851 "reserve": 20, 

852 "shares": 30, 

853 } 

854 

855 result = Ns._process_guest_epa_quota_params( 

856 guest_epa_quota=guest_epa_quota, 

857 epa_vcpu_set=epa_vcpu_set, 

858 ) 

859 

860 resource_allocation.assert_called_once_with(resource_allocation_param) 

861 self.assertDictEqual(expected_result, result) 

862 

863 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

864 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu( 

865 self, 

866 resource_allocation, 

867 ): 

868 expected_result = { 

869 "mem-quota": { 

870 "limit": 10, 

871 "reserve": 20, 

872 "shares": 30, 

873 }, 

874 } 

875 guest_epa_quota = { 

876 "mem-quota": { 

877 "limit": "10", 

878 "reserve": "20", 

879 "shares": "30", 

880 }, 

881 } 

882 epa_vcpu_set = False 

883 

884 resource_allocation_param = { 

885 "limit": "10", 

886 "reserve": "20", 

887 "shares": "30", 

888 } 

889 resource_allocation.return_value = { 

890 "limit": 10, 

891 "reserve": 20, 

892 "shares": 30, 

893 } 

894 

895 result = Ns._process_guest_epa_quota_params( 

896 guest_epa_quota=guest_epa_quota, 

897 epa_vcpu_set=epa_vcpu_set, 

898 ) 

899 

900 resource_allocation.assert_called_once_with(resource_allocation_param) 

901 self.assertDictEqual(expected_result, result) 

902 

903 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

904 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu( 

905 self, 

906 resource_allocation, 

907 ): 

908 expected_result = { 

909 "disk-io-quota": { 

910 "limit": 10, 

911 "reserve": 20, 

912 "shares": 30, 

913 }, 

914 } 

915 guest_epa_quota = { 

916 "disk-io-quota": { 

917 "limit": "10", 

918 "reserve": "20", 

919 "shares": "30", 

920 }, 

921 } 

922 epa_vcpu_set = True 

923 

924 resource_allocation_param = { 

925 "limit": "10", 

926 "reserve": "20", 

927 "shares": "30", 

928 } 

929 resource_allocation.return_value = { 

930 "limit": 10, 

931 "reserve": 20, 

932 "shares": 30, 

933 } 

934 

935 result = Ns._process_guest_epa_quota_params( 

936 guest_epa_quota=guest_epa_quota, 

937 epa_vcpu_set=epa_vcpu_set, 

938 ) 

939 

940 resource_allocation.assert_called_once_with(resource_allocation_param) 

941 self.assertDictEqual(expected_result, result) 

942 

943 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

944 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu( 

945 self, 

946 resource_allocation, 

947 ): 

948 expected_result = { 

949 "disk-io-quota": { 

950 "limit": 10, 

951 "reserve": 20, 

952 "shares": 30, 

953 }, 

954 } 

955 guest_epa_quota = { 

956 "disk-io-quota": { 

957 "limit": "10", 

958 "reserve": "20", 

959 "shares": "30", 

960 }, 

961 } 

962 epa_vcpu_set = False 

963 

964 resource_allocation_param = { 

965 "limit": "10", 

966 "reserve": "20", 

967 "shares": "30", 

968 } 

969 resource_allocation.return_value = { 

970 "limit": 10, 

971 "reserve": 20, 

972 "shares": 30, 

973 } 

974 

975 result = Ns._process_guest_epa_quota_params( 

976 guest_epa_quota=guest_epa_quota, 

977 epa_vcpu_set=epa_vcpu_set, 

978 ) 

979 

980 resource_allocation.assert_called_once_with(resource_allocation_param) 

981 self.assertDictEqual(expected_result, result) 

982 

983 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

984 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu( 

985 self, 

986 resource_allocation, 

987 ): 

988 expected_result = { 

989 "vif-quota": { 

990 "limit": 10, 

991 "reserve": 20, 

992 "shares": 30, 

993 }, 

994 } 

995 guest_epa_quota = { 

996 "vif-quota": { 

997 "limit": "10", 

998 "reserve": "20", 

999 "shares": "30", 

1000 }, 

1001 } 

1002 epa_vcpu_set = True 

1003 

1004 resource_allocation_param = { 

1005 "limit": "10", 

1006 "reserve": "20", 

1007 "shares": "30", 

1008 } 

1009 resource_allocation.return_value = { 

1010 "limit": 10, 

1011 "reserve": 20, 

1012 "shares": 30, 

1013 } 

1014 

1015 result = Ns._process_guest_epa_quota_params( 

1016 guest_epa_quota=guest_epa_quota, 

1017 epa_vcpu_set=epa_vcpu_set, 

1018 ) 

1019 

1020 resource_allocation.assert_called_once_with(resource_allocation_param) 

1021 self.assertDictEqual(expected_result, result) 

1022 

1023 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

1024 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu( 

1025 self, 

1026 resource_allocation, 

1027 ): 

1028 expected_result = { 

1029 "vif-quota": { 

1030 "limit": 10, 

1031 "reserve": 20, 

1032 "shares": 30, 

1033 }, 

1034 } 

1035 guest_epa_quota = { 

1036 "vif-quota": { 

1037 "limit": "10", 

1038 "reserve": "20", 

1039 "shares": "30", 

1040 }, 

1041 } 

1042 epa_vcpu_set = False 

1043 

1044 resource_allocation_param = { 

1045 "limit": "10", 

1046 "reserve": "20", 

1047 "shares": "30", 

1048 } 

1049 resource_allocation.return_value = { 

1050 "limit": 10, 

1051 "reserve": 20, 

1052 "shares": 30, 

1053 } 

1054 

1055 result = Ns._process_guest_epa_quota_params( 

1056 guest_epa_quota=guest_epa_quota, 

1057 epa_vcpu_set=epa_vcpu_set, 

1058 ) 

1059 

1060 resource_allocation.assert_called_once_with(resource_allocation_param) 

1061 self.assertDictEqual(expected_result, result) 

1062 

1063 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

1064 def test__process_guest_epa_quota_params_with_quota_epa_cpu( 

1065 self, 

1066 resource_allocation, 

1067 ): 

1068 expected_result = { 

1069 "mem-quota": { 

1070 "limit": 10, 

1071 "reserve": 20, 

1072 "shares": 30, 

1073 }, 

1074 "disk-io-quota": { 

1075 "limit": 10, 

1076 "reserve": 20, 

1077 "shares": 30, 

1078 }, 

1079 "vif-quota": { 

1080 "limit": 10, 

1081 "reserve": 20, 

1082 "shares": 30, 

1083 }, 

1084 } 

1085 guest_epa_quota = { 

1086 "cpu-quota": { 

1087 "limit": "10", 

1088 "reserve": "20", 

1089 "shares": "30", 

1090 }, 

1091 "mem-quota": { 

1092 "limit": "10", 

1093 "reserve": "20", 

1094 "shares": "30", 

1095 }, 

1096 "disk-io-quota": { 

1097 "limit": "10", 

1098 "reserve": "20", 

1099 "shares": "30", 

1100 }, 

1101 "vif-quota": { 

1102 "limit": "10", 

1103 "reserve": "20", 

1104 "shares": "30", 

1105 }, 

1106 } 

1107 epa_vcpu_set = True 

1108 

1109 resource_allocation.return_value = { 

1110 "limit": 10, 

1111 "reserve": 20, 

1112 "shares": 30, 

1113 } 

1114 

1115 result = Ns._process_guest_epa_quota_params( 

1116 guest_epa_quota=guest_epa_quota, 

1117 epa_vcpu_set=epa_vcpu_set, 

1118 ) 

1119 

1120 self.assertTrue(resource_allocation.called) 

1121 self.assertDictEqual(expected_result, result) 

1122 

1123 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

1124 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set( 

1125 self, 

1126 resource_allocation, 

1127 ): 

1128 expected_result = { 

1129 "cpu-quota": { 

1130 "limit": 10, 

1131 "reserve": 20, 

1132 "shares": 30, 

1133 }, 

1134 "mem-quota": { 

1135 "limit": 10, 

1136 "reserve": 20, 

1137 "shares": 30, 

1138 }, 

1139 "disk-io-quota": { 

1140 "limit": 10, 

1141 "reserve": 20, 

1142 "shares": 30, 

1143 }, 

1144 "vif-quota": { 

1145 "limit": 10, 

1146 "reserve": 20, 

1147 "shares": 30, 

1148 }, 

1149 } 

1150 guest_epa_quota = { 

1151 "cpu-quota": { 

1152 "limit": "10", 

1153 "reserve": "20", 

1154 "shares": "30", 

1155 }, 

1156 "mem-quota": { 

1157 "limit": "10", 

1158 "reserve": "20", 

1159 "shares": "30", 

1160 }, 

1161 "disk-io-quota": { 

1162 "limit": "10", 

1163 "reserve": "20", 

1164 "shares": "30", 

1165 }, 

1166 "vif-quota": { 

1167 "limit": "10", 

1168 "reserve": "20", 

1169 "shares": "30", 

1170 }, 

1171 } 

1172 epa_vcpu_set = False 

1173 

1174 resource_allocation.return_value = { 

1175 "limit": 10, 

1176 "reserve": 20, 

1177 "shares": 30, 

1178 } 

1179 

1180 result = Ns._process_guest_epa_quota_params( 

1181 guest_epa_quota=guest_epa_quota, 

1182 epa_vcpu_set=epa_vcpu_set, 

1183 ) 

1184 

1185 self.assertTrue(resource_allocation.called) 

1186 self.assertDictEqual(expected_result, result) 

1187 

1188 def test__process_guest_epa_numa_params_with_empty_numa_params(self): 

1189 expected_numa_result = [] 

1190 expected_epa_vcpu_set_result = False 

1191 guest_epa_quota = {} 

1192 

1193 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1194 guest_epa_quota=guest_epa_quota, 

1195 ) 

1196 self.assertEqual(expected_numa_result, numa_result) 

1197 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1198 

1199 def test__process_guest_epa_numa_params_with_wrong_numa_params(self): 

1200 expected_numa_result = [] 

1201 expected_epa_vcpu_set_result = False 

1202 guest_epa_quota = {"no_nume": "here"} 

1203 

1204 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1205 guest_epa_quota=guest_epa_quota, 

1206 ) 

1207 

1208 self.assertEqual(expected_numa_result, numa_result) 

1209 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1210 

1211 def test__process_guest_epa_numa_params_with_numa_node_policy(self): 

1212 expected_numa_result = [] 

1213 expected_epa_vcpu_set_result = False 

1214 guest_epa_quota = {"numa-node-policy": {}} 

1215 

1216 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1217 guest_epa_quota=guest_epa_quota, 

1218 ) 

1219 

1220 self.assertEqual(expected_numa_result, numa_result) 

1221 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1222 

1223 def test__process_guest_epa_numa_params_with_no_node(self): 

1224 expected_numa_result = [] 

1225 expected_epa_vcpu_set_result = False 

1226 guest_epa_quota = { 

1227 "numa-node-policy": { 

1228 "node": [], 

1229 }, 

1230 } 

1231 

1232 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1233 guest_epa_quota=guest_epa_quota, 

1234 ) 

1235 

1236 self.assertEqual(expected_numa_result, numa_result) 

1237 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1238 

1239 def test__process_guest_epa_numa_params_with_1_node_num_cores(self): 

1240 expected_numa_result = [{"cores": 3}] 

1241 expected_epa_vcpu_set_result = True 

1242 guest_epa_quota = { 

1243 "numa-node-policy": { 

1244 "node": [ 

1245 { 

1246 "num-cores": 3, 

1247 }, 

1248 ], 

1249 }, 

1250 } 

1251 

1252 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1253 guest_epa_quota=guest_epa_quota, 

1254 ) 

1255 

1256 self.assertEqual(expected_numa_result, numa_result) 

1257 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1258 

1259 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self): 

1260 expected_numa_result = [{"paired_threads": 3}] 

1261 expected_epa_vcpu_set_result = True 

1262 guest_epa_quota = { 

1263 "numa-node-policy": { 

1264 "node": [ 

1265 { 

1266 "paired-threads": {"num-paired-threads": "3"}, 

1267 }, 

1268 ], 

1269 }, 

1270 } 

1271 

1272 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1273 guest_epa_quota=guest_epa_quota, 

1274 ) 

1275 

1276 self.assertEqual(expected_numa_result, numa_result) 

1277 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1278 

1279 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self): 

1280 expected_numa_result = [ 

1281 { 

1282 "paired-threads-id": [("0", "1"), ("4", "5")], 

1283 } 

1284 ] 

1285 expected_epa_vcpu_set_result = False 

1286 guest_epa_quota = { 

1287 "numa-node-policy": { 

1288 "node": [ 

1289 { 

1290 "paired-threads": { 

1291 "paired-thread-ids": [ 

1292 { 

1293 "thread-a": 0, 

1294 "thread-b": 1, 

1295 }, 

1296 { 

1297 "thread-a": 4, 

1298 "thread-b": 5, 

1299 }, 

1300 ], 

1301 }, 

1302 }, 

1303 ], 

1304 }, 

1305 } 

1306 

1307 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1308 guest_epa_quota=guest_epa_quota, 

1309 ) 

1310 

1311 self.assertEqual(expected_numa_result, numa_result) 

1312 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1313 

1314 def test__process_guest_epa_numa_params_with_1_node_num_threads(self): 

1315 expected_numa_result = [{"threads": 3}] 

1316 expected_epa_vcpu_set_result = True 

1317 guest_epa_quota = { 

1318 "numa-node-policy": { 

1319 "node": [ 

1320 { 

1321 "num-threads": "3", 

1322 }, 

1323 ], 

1324 }, 

1325 } 

1326 

1327 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1328 guest_epa_quota=guest_epa_quota, 

1329 ) 

1330 

1331 self.assertEqual(expected_numa_result, numa_result) 

1332 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1333 

1334 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self): 

1335 expected_numa_result = [{"memory": 2}] 

1336 expected_epa_vcpu_set_result = False 

1337 guest_epa_quota = { 

1338 "numa-node-policy": { 

1339 "node": [ 

1340 { 

1341 "memory-mb": 2048, 

1342 }, 

1343 ], 

1344 }, 

1345 } 

1346 

1347 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1348 guest_epa_quota=guest_epa_quota, 

1349 ) 

1350 

1351 self.assertEqual(expected_numa_result, numa_result) 

1352 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1353 

1354 def test__process_guest_epa_numa_params_with_1_node_vcpu(self): 

1355 expected_numa_result = [ 

1356 { 

1357 "id": 0, 

1358 "vcpu": [0, 1], 

1359 } 

1360 ] 

1361 expected_epa_vcpu_set_result = False 

1362 guest_epa_quota = { 

1363 "numa-node-policy": { 

1364 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}], 

1365 }, 

1366 } 

1367 

1368 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1369 guest_epa_quota=guest_epa_quota, 

1370 ) 

1371 

1372 self.assertEqual(expected_numa_result, numa_result) 

1373 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1374 

1375 def test__process_guest_epa_numa_params_with_2_node_vcpu(self): 

1376 expected_numa_result = [ 

1377 { 

1378 "id": 0, 

1379 "vcpu": [0, 1], 

1380 }, 

1381 { 

1382 "id": 1, 

1383 "vcpu": [2, 3], 

1384 }, 

1385 ] 

1386 

1387 expected_epa_vcpu_set_result = False 

1388 guest_epa_quota = { 

1389 "numa-node-policy": { 

1390 "node": [ 

1391 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}, 

1392 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]}, 

1393 ], 

1394 }, 

1395 } 

1396 

1397 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1398 guest_epa_quota=guest_epa_quota, 

1399 ) 

1400 

1401 self.assertEqual(expected_numa_result, numa_result) 

1402 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1403 

1404 def test__process_guest_epa_numa_params_with_1_node(self): 

1405 expected_numa_result = [ 

1406 { 

1407 # "id": 0, 

1408 # "vcpu": [0, 1], 

1409 "cores": 3, 

1410 "paired_threads": 3, 

1411 "paired-threads-id": [("0", "1"), ("4", "5")], 

1412 "threads": 3, 

1413 "memory": 2, 

1414 } 

1415 ] 

1416 expected_epa_vcpu_set_result = True 

1417 guest_epa_quota = { 

1418 "numa-node-policy": { 

1419 "node": [ 

1420 { 

1421 "num-cores": 3, 

1422 "paired-threads": { 

1423 "num-paired-threads": "3", 

1424 "paired-thread-ids": [ 

1425 { 

1426 "thread-a": 0, 

1427 "thread-b": 1, 

1428 }, 

1429 { 

1430 "thread-a": 4, 

1431 "thread-b": 5, 

1432 }, 

1433 ], 

1434 }, 

1435 "num-threads": "3", 

1436 "memory-mb": 2048, 

1437 }, 

1438 ], 

1439 }, 

1440 } 

1441 

1442 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1443 guest_epa_quota=guest_epa_quota, 

1444 ) 

1445 

1446 self.assertEqual(expected_numa_result, numa_result) 

1447 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1448 

1449 def test__process_guest_epa_numa_params_with_2_nodes(self): 

1450 expected_numa_result = [ 

1451 { 

1452 "cores": 3, 

1453 "paired_threads": 3, 

1454 "paired-threads-id": [("0", "1"), ("4", "5")], 

1455 "threads": 3, 

1456 "memory": 2, 

1457 }, 

1458 { 

1459 "cores": 7, 

1460 "paired_threads": 7, 

1461 "paired-threads-id": [("2", "3"), ("5", "6")], 

1462 "threads": 4, 

1463 "memory": 4, 

1464 }, 

1465 ] 

1466 expected_epa_vcpu_set_result = True 

1467 guest_epa_quota = { 

1468 "numa-node-policy": { 

1469 "node": [ 

1470 { 

1471 "num-cores": 3, 

1472 "paired-threads": { 

1473 "num-paired-threads": "3", 

1474 "paired-thread-ids": [ 

1475 { 

1476 "thread-a": 0, 

1477 "thread-b": 1, 

1478 }, 

1479 { 

1480 "thread-a": 4, 

1481 "thread-b": 5, 

1482 }, 

1483 ], 

1484 }, 

1485 "num-threads": "3", 

1486 "memory-mb": 2048, 

1487 }, 

1488 { 

1489 "num-cores": 7, 

1490 "paired-threads": { 

1491 "num-paired-threads": "7", 

1492 "paired-thread-ids": [ 

1493 { 

1494 "thread-a": 2, 

1495 "thread-b": 3, 

1496 }, 

1497 { 

1498 "thread-a": 5, 

1499 "thread-b": 6, 

1500 }, 

1501 ], 

1502 }, 

1503 "num-threads": "4", 

1504 "memory-mb": 4096, 

1505 }, 

1506 ], 

1507 }, 

1508 } 

1509 

1510 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1511 guest_epa_quota=guest_epa_quota, 

1512 ) 

1513 

1514 self.assertEqual(expected_numa_result, numa_result) 

1515 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1516 

1517 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self): 

1518 expected_numa_result = {} 

1519 expected_epa_vcpu_set_result = False 

1520 guest_epa_quota = {} 

1521 vcpu_count = 0 

1522 epa_vcpu_set = False 

1523 

1524 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1525 guest_epa_quota=guest_epa_quota, 

1526 vcpu_count=vcpu_count, 

1527 epa_vcpu_set=epa_vcpu_set, 

1528 ) 

1529 

1530 self.assertDictEqual(expected_numa_result, numa_result) 

1531 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1532 

1533 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self): 

1534 expected_numa_result = {} 

1535 expected_epa_vcpu_set_result = False 

1536 guest_epa_quota = { 

1537 "no-cpu-pinning-policy": "here", 

1538 } 

1539 vcpu_count = 0 

1540 epa_vcpu_set = False 

1541 

1542 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1543 guest_epa_quota=guest_epa_quota, 

1544 vcpu_count=vcpu_count, 

1545 epa_vcpu_set=epa_vcpu_set, 

1546 ) 

1547 

1548 self.assertDictEqual(expected_numa_result, numa_result) 

1549 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1550 

1551 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self): 

1552 expected_numa_result = {} 

1553 expected_epa_vcpu_set_result = True 

1554 guest_epa_quota = { 

1555 "cpu-pinning-policy": "DEDICATED", 

1556 } 

1557 vcpu_count = 0 

1558 epa_vcpu_set = True 

1559 

1560 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1561 guest_epa_quota=guest_epa_quota, 

1562 vcpu_count=vcpu_count, 

1563 epa_vcpu_set=epa_vcpu_set, 

1564 ) 

1565 

1566 self.assertDictEqual(expected_numa_result, numa_result) 

1567 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1568 

1569 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self): 

1570 expected_numa_result = {"threads": 3} 

1571 expected_epa_vcpu_set_result = True 

1572 guest_epa_quota = { 

1573 "cpu-pinning-policy": "DEDICATED", 

1574 "cpu-thread-pinning-policy": "PREFER", 

1575 } 

1576 vcpu_count = 3 

1577 epa_vcpu_set = False 

1578 

1579 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1580 guest_epa_quota=guest_epa_quota, 

1581 vcpu_count=vcpu_count, 

1582 epa_vcpu_set=epa_vcpu_set, 

1583 ) 

1584 

1585 self.assertDictEqual(expected_numa_result, numa_result) 

1586 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1587 

1588 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self): 

1589 expected_numa_result = {"cores": 3} 

1590 expected_epa_vcpu_set_result = True 

1591 guest_epa_quota = { 

1592 "cpu-pinning-policy": "DEDICATED", 

1593 "cpu-thread-pinning-policy": "ISOLATE", 

1594 } 

1595 vcpu_count = 3 

1596 epa_vcpu_set = False 

1597 

1598 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1599 guest_epa_quota=guest_epa_quota, 

1600 vcpu_count=vcpu_count, 

1601 epa_vcpu_set=epa_vcpu_set, 

1602 ) 

1603 

1604 self.assertDictEqual(expected_numa_result, numa_result) 

1605 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1606 

1607 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self): 

1608 expected_numa_result = {"threads": 3} 

1609 expected_epa_vcpu_set_result = True 

1610 guest_epa_quota = { 

1611 "cpu-pinning-policy": "DEDICATED", 

1612 "cpu-thread-pinning-policy": "REQUIRE", 

1613 } 

1614 vcpu_count = 3 

1615 epa_vcpu_set = False 

1616 

1617 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1618 guest_epa_quota=guest_epa_quota, 

1619 vcpu_count=vcpu_count, 

1620 epa_vcpu_set=epa_vcpu_set, 

1621 ) 

1622 

1623 self.assertDictEqual(expected_numa_result, numa_result) 

1624 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1625 

1626 def test__process_guest_epa_cpu_pinning_params(self): 

1627 expected_numa_result = {"threads": 3} 

1628 expected_epa_vcpu_set_result = True 

1629 guest_epa_quota = { 

1630 "cpu-pinning-policy": "DEDICATED", 

1631 } 

1632 vcpu_count = 3 

1633 epa_vcpu_set = False 

1634 

1635 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1636 guest_epa_quota=guest_epa_quota, 

1637 vcpu_count=vcpu_count, 

1638 epa_vcpu_set=epa_vcpu_set, 

1639 ) 

1640 

1641 self.assertDictEqual(expected_numa_result, numa_result) 

1642 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1643 

1644 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1645 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1646 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1647 def test__process_guest_epa_params_with_empty_params( 

1648 self, 

1649 guest_epa_numa_params, 

1650 guest_epa_cpu_pinning_params, 

1651 guest_epa_quota_params, 

1652 ): 

1653 expected_result = {} 

1654 target_flavor = {} 

1655 

1656 result = Ns._process_epa_params( 

1657 target_flavor=target_flavor, 

1658 ) 

1659 

1660 self.assertDictEqual(expected_result, result) 

1661 self.assertFalse(guest_epa_numa_params.called) 

1662 self.assertFalse(guest_epa_cpu_pinning_params.called) 

1663 self.assertFalse(guest_epa_quota_params.called) 

1664 

1665 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1666 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1667 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1668 def test__process_guest_epa_params_with_wrong_params( 

1669 self, 

1670 guest_epa_numa_params, 

1671 guest_epa_cpu_pinning_params, 

1672 guest_epa_quota_params, 

1673 ): 

1674 expected_result = {} 

1675 target_flavor = { 

1676 "no-guest-epa": "here", 

1677 } 

1678 

1679 result = Ns._process_epa_params( 

1680 target_flavor=target_flavor, 

1681 ) 

1682 

1683 self.assertDictEqual(expected_result, result) 

1684 self.assertFalse(guest_epa_numa_params.called) 

1685 self.assertFalse(guest_epa_cpu_pinning_params.called) 

1686 self.assertFalse(guest_epa_quota_params.called) 

1687 

1688 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1689 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1690 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1691 def test__process_guest_epa_params( 

1692 self, 

1693 guest_epa_numa_params, 

1694 guest_epa_cpu_pinning_params, 

1695 guest_epa_quota_params, 

1696 ): 

1697 expected_result = { 

1698 "mem-policy": "STRICT", 

1699 } 

1700 target_flavor = { 

1701 "guest-epa": { 

1702 "vcpu-count": 1, 

1703 "numa-node-policy": { 

1704 "mem-policy": "STRICT", 

1705 }, 

1706 }, 

1707 } 

1708 

1709 guest_epa_numa_params.return_value = ({}, False) 

1710 guest_epa_cpu_pinning_params.return_value = ({}, False) 

1711 guest_epa_quota_params.return_value = {} 

1712 

1713 result = Ns._process_epa_params( 

1714 target_flavor=target_flavor, 

1715 ) 

1716 

1717 self.assertDictEqual(expected_result, result) 

1718 self.assertTrue(guest_epa_numa_params.called) 

1719 self.assertTrue(guest_epa_cpu_pinning_params.called) 

1720 self.assertTrue(guest_epa_quota_params.called) 

1721 

1722 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1723 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1724 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1725 def test__process_guest_epa_params_with_mempage_size( 

1726 self, 

1727 guest_epa_numa_params, 

1728 guest_epa_cpu_pinning_params, 

1729 guest_epa_quota_params, 

1730 ): 

1731 expected_result = { 

1732 "mempage-size": "1G", 

1733 "mem-policy": "STRICT", 

1734 } 

1735 target_flavor = { 

1736 "guest-epa": { 

1737 "vcpu-count": 1, 

1738 "mempage-size": "1G", 

1739 "numa-node-policy": { 

1740 "mem-policy": "STRICT", 

1741 }, 

1742 }, 

1743 } 

1744 

1745 guest_epa_numa_params.return_value = ({}, False) 

1746 guest_epa_cpu_pinning_params.return_value = ({}, False) 

1747 guest_epa_quota_params.return_value = {} 

1748 

1749 result = Ns._process_epa_params( 

1750 target_flavor=target_flavor, 

1751 ) 

1752 

1753 self.assertDictEqual(expected_result, result) 

1754 self.assertTrue(guest_epa_numa_params.called) 

1755 self.assertTrue(guest_epa_cpu_pinning_params.called) 

1756 self.assertTrue(guest_epa_quota_params.called) 

1757 

1758 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1759 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1760 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1761 def test__process_guest_epa_params_with_numa( 

1762 self, 

1763 guest_epa_numa_params, 

1764 guest_epa_cpu_pinning_params, 

1765 guest_epa_quota_params, 

1766 ): 

1767 expected_result = { 

1768 "mempage-size": "1G", 

1769 "cpu-pinning-policy": "DEDICATED", 

1770 "cpu-thread-pinning-policy": "PREFER", 

1771 "numas": [ 

1772 { 

1773 "cores": 3, 

1774 "memory": 2, 

1775 "paired-threads": 3, 

1776 "paired-threads-id": [("0", "1"), ("4", "5")], 

1777 "threads": 3, 

1778 } 

1779 ], 

1780 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1781 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1782 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1783 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1784 } 

1785 target_flavor = { 

1786 "guest-epa": { 

1787 "vcpu-count": 1, 

1788 "mempage-size": "1G", 

1789 "cpu-pinning-policy": "DEDICATED", 

1790 "cpu-thread-pinning-policy": "PREFER", 

1791 "numa-node-policy": { 

1792 "node": [ 

1793 { 

1794 "num-cores": 3, 

1795 "paired-threads": { 

1796 "num-paired-threads": "3", 

1797 "paired-thread-ids": [ 

1798 { 

1799 "thread-a": 0, 

1800 "thread-b": 1, 

1801 }, 

1802 { 

1803 "thread-a": 4, 

1804 "thread-b": 5, 

1805 }, 

1806 ], 

1807 }, 

1808 "num-threads": "3", 

1809 "memory-mb": 2048, 

1810 }, 

1811 ], 

1812 }, 

1813 "cpu-quota": { 

1814 "limit": "10", 

1815 "reserve": "20", 

1816 "shares": "30", 

1817 }, 

1818 "mem-quota": { 

1819 "limit": "10", 

1820 "reserve": "20", 

1821 "shares": "30", 

1822 }, 

1823 "disk-io-quota": { 

1824 "limit": "10", 

1825 "reserve": "20", 

1826 "shares": "30", 

1827 }, 

1828 "vif-quota": { 

1829 "limit": "10", 

1830 "reserve": "20", 

1831 "shares": "30", 

1832 }, 

1833 }, 

1834 } 

1835 

1836 guest_epa_numa_params.return_value = ( 

1837 [ 

1838 { 

1839 "cores": 3, 

1840 "paired-threads": 3, 

1841 "paired-threads-id": [("0", "1"), ("4", "5")], 

1842 "threads": 3, 

1843 "memory": 2, 

1844 }, 

1845 ], 

1846 True, 

1847 ) 

1848 guest_epa_cpu_pinning_params.return_value = ( 

1849 { 

1850 "threads": 3, 

1851 }, 

1852 True, 

1853 ) 

1854 guest_epa_quota_params.return_value = { 

1855 "cpu-quota": { 

1856 "limit": 10, 

1857 "reserve": 20, 

1858 "shares": 30, 

1859 }, 

1860 "mem-quota": { 

1861 "limit": 10, 

1862 "reserve": 20, 

1863 "shares": 30, 

1864 }, 

1865 "disk-io-quota": { 

1866 "limit": 10, 

1867 "reserve": 20, 

1868 "shares": 30, 

1869 }, 

1870 "vif-quota": { 

1871 "limit": 10, 

1872 "reserve": 20, 

1873 "shares": 30, 

1874 }, 

1875 } 

1876 

1877 result = Ns._process_epa_params( 

1878 target_flavor=target_flavor, 

1879 ) 

1880 self.assertEqual(expected_result, result) 

1881 self.assertTrue(guest_epa_numa_params.called) 

1882 self.assertTrue(guest_epa_cpu_pinning_params.called) 

1883 self.assertTrue(guest_epa_quota_params.called) 

1884 

1885 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1886 def test__process_flavor_params_with_empty_target_flavor( 

1887 self, 

1888 epa_params, 

1889 ): 

1890 target_flavor = {} 

1891 indata = { 

1892 "vnf": [ 

1893 { 

1894 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

1895 }, 

1896 ], 

1897 } 

1898 vim_info = {} 

1899 target_record_id = "" 

1900 

1901 with self.assertRaises(KeyError): 

1902 Ns._process_flavor_params( 

1903 target_flavor=target_flavor, 

1904 indata=indata, 

1905 vim_info=vim_info, 

1906 target_record_id=target_record_id, 

1907 ) 

1908 

1909 self.assertFalse(epa_params.called) 

1910 

1911 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1912 def test__process_flavor_params_with_wrong_target_flavor( 

1913 self, 

1914 epa_params, 

1915 ): 

1916 target_flavor = { 

1917 "no-target-flavor": "here", 

1918 } 

1919 indata = {} 

1920 vim_info = {} 

1921 target_record_id = "" 

1922 

1923 with self.assertRaises(KeyError): 

1924 Ns._process_flavor_params( 

1925 target_flavor=target_flavor, 

1926 indata=indata, 

1927 vim_info=vim_info, 

1928 target_record_id=target_record_id, 

1929 ) 

1930 

1931 self.assertFalse(epa_params.called) 

1932 

1933 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1934 def test__process_flavor_params_with_empty_indata( 

1935 self, 

1936 epa_params, 

1937 ): 

1938 expected_result = { 

1939 "find_params": { 

1940 "flavor_data": { 

1941 "disk": 10, 

1942 "ram": 1024, 

1943 "vcpus": 2, 

1944 }, 

1945 }, 

1946 "params": { 

1947 "flavor_data": { 

1948 "disk": 10, 

1949 "name": "test", 

1950 "ram": 1024, 

1951 "vcpus": 2, 

1952 }, 

1953 }, 

1954 } 

1955 target_flavor = { 

1956 "name": "test", 

1957 "storage-gb": "10", 

1958 "memory-mb": "1024", 

1959 "vcpu-count": "2", 

1960 } 

1961 indata = {} 

1962 vim_info = {} 

1963 target_record_id = "" 

1964 

1965 epa_params.return_value = {} 

1966 

1967 result = Ns._process_flavor_params( 

1968 target_flavor=target_flavor, 

1969 indata=indata, 

1970 vim_info=vim_info, 

1971 target_record_id=target_record_id, 

1972 ) 

1973 

1974 self.assertTrue(epa_params.called) 

1975 self.assertDictEqual(result, expected_result) 

1976 

1977 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1978 def test__process_flavor_params_with_wrong_indata( 

1979 self, 

1980 epa_params, 

1981 ): 

1982 expected_result = { 

1983 "find_params": { 

1984 "flavor_data": { 

1985 "disk": 10, 

1986 "ram": 1024, 

1987 "vcpus": 2, 

1988 }, 

1989 }, 

1990 "params": { 

1991 "flavor_data": { 

1992 "disk": 10, 

1993 "name": "test", 

1994 "ram": 1024, 

1995 "vcpus": 2, 

1996 }, 

1997 }, 

1998 } 

1999 target_flavor = { 

2000 "name": "test", 

2001 "storage-gb": "10", 

2002 "memory-mb": "1024", 

2003 "vcpu-count": "2", 

2004 } 

2005 indata = { 

2006 "no-vnf": "here", 

2007 } 

2008 vim_info = {} 

2009 target_record_id = "" 

2010 

2011 epa_params.return_value = {} 

2012 

2013 result = Ns._process_flavor_params( 

2014 target_flavor=target_flavor, 

2015 indata=indata, 

2016 vim_info=vim_info, 

2017 target_record_id=target_record_id, 

2018 ) 

2019 

2020 self.assertTrue(epa_params.called) 

2021 self.assertDictEqual(result, expected_result) 

2022 

2023 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2024 def test__process_flavor_params_with_ephemeral_disk( 

2025 self, 

2026 epa_params, 

2027 ): 

2028 kwargs = { 

2029 "db": db, 

2030 } 

2031 

2032 db.get_one.return_value = { 

2033 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2034 "df": [ 

2035 { 

2036 "id": "default-df", 

2037 "vdu-profile": [ 

2038 {"id": "without_volumes-VM", "min-number-of-instances": 1} 

2039 ], 

2040 } 

2041 ], 

2042 "id": "without_volumes-vnf", 

2043 "product-name": "without_volumes-vnf", 

2044 "vdu": [ 

2045 { 

2046 "id": "without_volumes-VM", 

2047 "name": "without_volumes-VM", 

2048 "sw-image-desc": "ubuntu20.04", 

2049 "alternative-sw-image-desc": [ 

2050 "ubuntu20.04-aws", 

2051 "ubuntu20.04-azure", 

2052 ], 

2053 "virtual-storage-desc": ["root-volume", "ephemeral-volume"], 

2054 } 

2055 ], 

2056 "version": "1.0", 

2057 "virtual-storage-desc": [ 

2058 {"id": "root-volume", "size-of-storage": "10"}, 

2059 { 

2060 "id": "ephemeral-volume", 

2061 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2062 "size-of-storage": "1", 

2063 }, 

2064 ], 

2065 "_admin": { 

2066 "storage": { 

2067 "fs": "mongo", 

2068 "path": "/app/storage/", 

2069 }, 

2070 "type": "vnfd", 

2071 }, 

2072 } 

2073 expected_result = { 

2074 "find_params": { 

2075 "flavor_data": { 

2076 "disk": 10, 

2077 "ram": 1024, 

2078 "vcpus": 2, 

2079 "ephemeral": 10, 

2080 }, 

2081 }, 

2082 "params": { 

2083 "flavor_data": { 

2084 "disk": 10, 

2085 "name": "test", 

2086 "ram": 1024, 

2087 "vcpus": 2, 

2088 "ephemeral": 10, 

2089 }, 

2090 }, 

2091 } 

2092 target_flavor = { 

2093 "id": "test_id", 

2094 "name": "test", 

2095 "storage-gb": "10", 

2096 "memory-mb": "1024", 

2097 "vcpu-count": "2", 

2098 } 

2099 indata = { 

2100 "vnf": [ 

2101 { 

2102 "vdur": [ 

2103 { 

2104 "ns-flavor-id": "test_id", 

2105 "virtual-storages": [ 

2106 { 

2107 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2108 "size-of-storage": "10", 

2109 }, 

2110 ], 

2111 }, 

2112 ], 

2113 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2114 }, 

2115 ], 

2116 } 

2117 vim_info = {} 

2118 target_record_id = "" 

2119 

2120 epa_params.return_value = {} 

2121 

2122 result = Ns._process_flavor_params( 

2123 target_flavor=target_flavor, 

2124 indata=indata, 

2125 vim_info=vim_info, 

2126 target_record_id=target_record_id, 

2127 **kwargs, 

2128 ) 

2129 

2130 self.assertTrue(epa_params.called) 

2131 self.assertDictEqual(result, expected_result) 

2132 

2133 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2134 def test__process_flavor_params_with_swap_disk( 

2135 self, 

2136 epa_params, 

2137 ): 

2138 expected_result = { 

2139 "find_params": { 

2140 "flavor_data": { 

2141 "disk": 10, 

2142 "ram": 1024, 

2143 "vcpus": 2, 

2144 "swap": 20, 

2145 }, 

2146 }, 

2147 "params": { 

2148 "flavor_data": { 

2149 "disk": 10, 

2150 "name": "test", 

2151 "ram": 1024, 

2152 "vcpus": 2, 

2153 "swap": 20, 

2154 }, 

2155 }, 

2156 } 

2157 target_flavor = { 

2158 "id": "test_id", 

2159 "name": "test", 

2160 "storage-gb": "10", 

2161 "memory-mb": "1024", 

2162 "vcpu-count": "2", 

2163 } 

2164 indata = { 

2165 "vnf": [ 

2166 { 

2167 "vdur": [ 

2168 { 

2169 "ns-flavor-id": "test_id", 

2170 "virtual-storages": [ 

2171 { 

2172 "type-of-storage": "etsi-nfv-descriptors:swap-storage", 

2173 "size-of-storage": "20", 

2174 }, 

2175 ], 

2176 }, 

2177 ], 

2178 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2179 }, 

2180 ], 

2181 } 

2182 vim_info = {} 

2183 target_record_id = "" 

2184 

2185 epa_params.return_value = {} 

2186 

2187 result = Ns._process_flavor_params( 

2188 target_flavor=target_flavor, 

2189 indata=indata, 

2190 vim_info=vim_info, 

2191 target_record_id=target_record_id, 

2192 ) 

2193 

2194 self.assertTrue(epa_params.called) 

2195 self.assertDictEqual(result, expected_result) 

2196 

2197 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2198 def test__process_flavor_params_with_persistent_root_disk( 

2199 self, 

2200 epa_params, 

2201 ): 

2202 kwargs = { 

2203 "db": db, 

2204 } 

2205 

2206 db.get_one.return_value = { 

2207 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2208 "df": [ 

2209 { 

2210 "id": "default-df", 

2211 "vdu-profile": [ 

2212 {"id": "several_volumes-VM", "min-number-of-instances": 1} 

2213 ], 

2214 } 

2215 ], 

2216 "id": "several_volumes-vnf", 

2217 "product-name": "several_volumes-vnf", 

2218 "vdu": [ 

2219 { 

2220 "id": "several_volumes-VM", 

2221 "name": "several_volumes-VM", 

2222 "sw-image-desc": "ubuntu20.04", 

2223 "alternative-sw-image-desc": [ 

2224 "ubuntu20.04-aws", 

2225 "ubuntu20.04-azure", 

2226 ], 

2227 "virtual-storage-desc": [ 

2228 "persistent-root-volume", 

2229 ], 

2230 } 

2231 ], 

2232 "version": "1.0", 

2233 "virtual-storage-desc": [ 

2234 { 

2235 "id": "persistent-root-volume", 

2236 "type-of-storage": "persistent-storage:persistent-storage", 

2237 "size-of-storage": "10", 

2238 }, 

2239 ], 

2240 "_admin": { 

2241 "storage": { 

2242 "fs": "mongo", 

2243 "path": "/app/storage/", 

2244 }, 

2245 "type": "vnfd", 

2246 }, 

2247 } 

2248 expected_result = { 

2249 "find_params": { 

2250 "flavor_data": { 

2251 "disk": 0, 

2252 "ram": 1024, 

2253 "vcpus": 2, 

2254 }, 

2255 }, 

2256 "params": { 

2257 "flavor_data": { 

2258 "disk": 0, 

2259 "name": "test", 

2260 "ram": 1024, 

2261 "vcpus": 2, 

2262 }, 

2263 }, 

2264 } 

2265 target_flavor = { 

2266 "id": "test_id", 

2267 "name": "test", 

2268 "storage-gb": "10", 

2269 "memory-mb": "1024", 

2270 "vcpu-count": "2", 

2271 } 

2272 indata = { 

2273 "vnf": [ 

2274 { 

2275 "vdur": [ 

2276 { 

2277 "vdu-name": "several_volumes-VM", 

2278 "ns-flavor-id": "test_id", 

2279 "virtual-storages": [ 

2280 { 

2281 "type-of-storage": "persistent-storage:persistent-storage", 

2282 "size-of-storage": "10", 

2283 }, 

2284 ], 

2285 }, 

2286 ], 

2287 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2288 }, 

2289 ], 

2290 } 

2291 vim_info = {} 

2292 target_record_id = "" 

2293 

2294 epa_params.return_value = {} 

2295 

2296 result = Ns._process_flavor_params( 

2297 target_flavor=target_flavor, 

2298 indata=indata, 

2299 vim_info=vim_info, 

2300 target_record_id=target_record_id, 

2301 **kwargs, 

2302 ) 

2303 

2304 self.assertTrue(epa_params.called) 

2305 self.assertDictEqual(result, expected_result) 

2306 

2307 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2308 def test__process_flavor_params_with_epa_params( 

2309 self, 

2310 epa_params, 

2311 ): 

2312 expected_result = { 

2313 "find_params": { 

2314 "flavor_data": { 

2315 "disk": 10, 

2316 "ram": 1024, 

2317 "vcpus": 2, 

2318 "extended": { 

2319 "numa": "there-is-numa-here", 

2320 }, 

2321 }, 

2322 }, 

2323 "params": { 

2324 "flavor_data": { 

2325 "disk": 10, 

2326 "name": "test", 

2327 "ram": 1024, 

2328 "vcpus": 2, 

2329 "extended": { 

2330 "numa": "there-is-numa-here", 

2331 }, 

2332 }, 

2333 }, 

2334 } 

2335 target_flavor = { 

2336 "id": "test_id", 

2337 "name": "test", 

2338 "storage-gb": "10", 

2339 "memory-mb": "1024", 

2340 "vcpu-count": "2", 

2341 } 

2342 indata = { 

2343 "vnf": [ 

2344 { 

2345 "vdur": [ 

2346 { 

2347 "ns-flavor-id": "test_id", 

2348 }, 

2349 ], 

2350 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2351 }, 

2352 ], 

2353 } 

2354 vim_info = {} 

2355 target_record_id = "" 

2356 

2357 epa_params.return_value = { 

2358 "numa": "there-is-numa-here", 

2359 } 

2360 

2361 result = Ns._process_flavor_params( 

2362 target_flavor=target_flavor, 

2363 indata=indata, 

2364 vim_info=vim_info, 

2365 target_record_id=target_record_id, 

2366 ) 

2367 

2368 self.assertTrue(epa_params.called) 

2369 self.assertDictEqual(result, expected_result) 

2370 

2371 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2372 def test__process_flavor_params_with_vim_flavor_id( 

2373 self, 

2374 epa_params, 

2375 ): 

2376 expected_result = { 

2377 "find_params": { 

2378 "vim_flavor_id": "test.flavor", 

2379 }, 

2380 } 

2381 target_flavor = { 

2382 "id": "test_id", 

2383 "name": "test", 

2384 "storage-gb": "10", 

2385 "memory-mb": "1024", 

2386 "vcpu-count": "2", 

2387 } 

2388 indata = { 

2389 "vnf": [ 

2390 { 

2391 "vdur": [ 

2392 { 

2393 "ns-flavor-id": "test_id", 

2394 "additionalParams": { 

2395 "OSM": {"vim_flavor_id": "test.flavor"} 

2396 }, 

2397 }, 

2398 ], 

2399 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2400 }, 

2401 ], 

2402 } 

2403 vim_info = {} 

2404 target_record_id = "" 

2405 

2406 epa_params.return_value = {} 

2407 

2408 result = Ns._process_flavor_params( 

2409 target_flavor=target_flavor, 

2410 indata=indata, 

2411 vim_info=vim_info, 

2412 target_record_id=target_record_id, 

2413 ) 

2414 

2415 self.assertFalse(epa_params.called) 

2416 self.assertDictEqual(result, expected_result) 

2417 

2418 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2419 def test__process_flavor_params( 

2420 self, 

2421 epa_params, 

2422 ): 

2423 kwargs = { 

2424 "db": db, 

2425 } 

2426 

2427 db.get_one.return_value = { 

2428 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2429 "df": [ 

2430 { 

2431 "id": "default-df", 

2432 "vdu-profile": [ 

2433 {"id": "without_volumes-VM", "min-number-of-instances": 1} 

2434 ], 

2435 } 

2436 ], 

2437 "id": "without_volumes-vnf", 

2438 "product-name": "without_volumes-vnf", 

2439 "vdu": [ 

2440 { 

2441 "id": "without_volumes-VM", 

2442 "name": "without_volumes-VM", 

2443 "sw-image-desc": "ubuntu20.04", 

2444 "alternative-sw-image-desc": [ 

2445 "ubuntu20.04-aws", 

2446 "ubuntu20.04-azure", 

2447 ], 

2448 "virtual-storage-desc": ["root-volume", "ephemeral-volume"], 

2449 } 

2450 ], 

2451 "version": "1.0", 

2452 "virtual-storage-desc": [ 

2453 {"id": "root-volume", "size-of-storage": "10"}, 

2454 { 

2455 "id": "ephemeral-volume", 

2456 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2457 "size-of-storage": "1", 

2458 }, 

2459 ], 

2460 "_admin": { 

2461 "storage": { 

2462 "fs": "mongo", 

2463 "path": "/app/storage/", 

2464 }, 

2465 "type": "vnfd", 

2466 }, 

2467 } 

2468 

2469 expected_result = { 

2470 "find_params": { 

2471 "flavor_data": { 

2472 "disk": 10, 

2473 "ram": 1024, 

2474 "vcpus": 2, 

2475 "ephemeral": 10, 

2476 "swap": 20, 

2477 "extended": { 

2478 "numa": "there-is-numa-here", 

2479 }, 

2480 }, 

2481 }, 

2482 "params": { 

2483 "flavor_data": { 

2484 "disk": 10, 

2485 "name": "test", 

2486 "ram": 1024, 

2487 "vcpus": 2, 

2488 "ephemeral": 10, 

2489 "swap": 20, 

2490 "extended": { 

2491 "numa": "there-is-numa-here", 

2492 }, 

2493 }, 

2494 }, 

2495 } 

2496 target_flavor = { 

2497 "id": "test_id", 

2498 "name": "test", 

2499 "storage-gb": "10", 

2500 "memory-mb": "1024", 

2501 "vcpu-count": "2", 

2502 } 

2503 indata = { 

2504 "vnf": [ 

2505 { 

2506 "vdur": [ 

2507 { 

2508 "ns-flavor-id": "test_id", 

2509 "virtual-storages": [ 

2510 { 

2511 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2512 "size-of-storage": "10", 

2513 }, 

2514 { 

2515 "type-of-storage": "etsi-nfv-descriptors:swap-storage", 

2516 "size-of-storage": "20", 

2517 }, 

2518 ], 

2519 }, 

2520 ], 

2521 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2522 }, 

2523 ], 

2524 } 

2525 vim_info = {} 

2526 target_record_id = "" 

2527 

2528 epa_params.return_value = { 

2529 "numa": "there-is-numa-here", 

2530 } 

2531 

2532 result = Ns._process_flavor_params( 

2533 target_flavor=target_flavor, 

2534 indata=indata, 

2535 vim_info=vim_info, 

2536 target_record_id=target_record_id, 

2537 **kwargs, 

2538 ) 

2539 

2540 self.assertTrue(epa_params.called) 

2541 self.assertDictEqual(result, expected_result) 

2542 

2543 def test__process_net_params_with_empty_params( 

2544 self, 

2545 ): 

2546 target_vld = { 

2547 "name": "vld-name", 

2548 } 

2549 indata = { 

2550 "name": "ns-name", 

2551 } 

2552 vim_info = { 

2553 "provider_network": "some-profile-here", 

2554 "ip_profile": { 

2555 "some_ip_profile": "here", 

2556 }, 

2557 } 

2558 target_record_id = "" 

2559 expected_result = { 

2560 "params": { 

2561 "net_name": "ns-name-vld-name", 

2562 "net_type": "bridge", 

2563 "ip_profile": { 

2564 "some_ip_profile": "here", 

2565 }, 

2566 "provider_network_profile": "some-profile-here", 

2567 } 

2568 } 

2569 

2570 result = Ns._process_net_params( 

2571 target_vld=target_vld, 

2572 indata=indata, 

2573 vim_info=vim_info, 

2574 target_record_id=target_record_id, 

2575 ) 

2576 

2577 self.assertDictEqual(expected_result, result) 

2578 

2579 def test__process_net_params_with_vim_info_sdn( 

2580 self, 

2581 ): 

2582 target_vld = { 

2583 "name": "vld-name", 

2584 } 

2585 indata = { 

2586 "name": "ns-name", 

2587 } 

2588 vim_info = { 

2589 "sdn": "some-sdn", 

2590 "sdn-ports": ["some", "ports", "here"], 

2591 "vlds": ["some", "vlds", "here"], 

2592 "type": "sdn-type", 

2593 } 

2594 target_record_id = "vld.sdn.something" 

2595 expected_result = { 

2596 "params": { 

2597 "sdn-ports": ["some", "ports", "here"], 

2598 "vlds": ["some", "vlds", "here"], 

2599 "type": "sdn-type", 

2600 } 

2601 } 

2602 

2603 result = Ns._process_net_params( 

2604 target_vld=target_vld, 

2605 indata=indata, 

2606 vim_info=vim_info, 

2607 target_record_id=target_record_id, 

2608 ) 

2609 

2610 self.assertDictEqual(expected_result, result) 

2611 

2612 def test__process_net_params_with_vim_info_sdn_target_vim( 

2613 self, 

2614 ): 

2615 target_vld = { 

2616 "name": "vld-name", 

2617 } 

2618 indata = { 

2619 "name": "ns-name", 

2620 } 

2621 vim_info = { 

2622 "sdn": "some-sdn", 

2623 "sdn-ports": ["some", "ports", "here"], 

2624 "vlds": ["some", "vlds", "here"], 

2625 "target_vim": "some-vim", 

2626 "type": "sdn-type", 

2627 } 

2628 target_record_id = "vld.sdn.something" 

2629 expected_result = { 

2630 "depends_on": ["some-vim vld.sdn"], 

2631 "params": { 

2632 "sdn-ports": ["some", "ports", "here"], 

2633 "vlds": ["some", "vlds", "here"], 

2634 "target_vim": "some-vim", 

2635 "type": "sdn-type", 

2636 }, 

2637 } 

2638 

2639 result = Ns._process_net_params( 

2640 target_vld=target_vld, 

2641 indata=indata, 

2642 vim_info=vim_info, 

2643 target_record_id=target_record_id, 

2644 ) 

2645 

2646 self.assertDictEqual(expected_result, result) 

2647 

2648 def test__process_net_params_with_vim_network_name( 

2649 self, 

2650 ): 

2651 target_vld = { 

2652 "name": "vld-name", 

2653 } 

2654 indata = { 

2655 "name": "ns-name", 

2656 } 

2657 vim_info = { 

2658 "vim_network_name": "some-network-name", 

2659 } 

2660 target_record_id = "vld.sdn.something" 

2661 expected_result = { 

2662 "find_params": { 

2663 "filter_dict": { 

2664 "name": "some-network-name", 

2665 }, 

2666 }, 

2667 } 

2668 

2669 result = Ns._process_net_params( 

2670 target_vld=target_vld, 

2671 indata=indata, 

2672 vim_info=vim_info, 

2673 target_record_id=target_record_id, 

2674 ) 

2675 

2676 self.assertDictEqual(expected_result, result) 

2677 

2678 def test__process_net_params_with_vim_network_id( 

2679 self, 

2680 ): 

2681 target_vld = { 

2682 "name": "vld-name", 

2683 } 

2684 indata = { 

2685 "name": "ns-name", 

2686 } 

2687 vim_info = { 

2688 "vim_network_id": "some-network-id", 

2689 } 

2690 target_record_id = "vld.sdn.something" 

2691 expected_result = { 

2692 "find_params": { 

2693 "filter_dict": { 

2694 "id": "some-network-id", 

2695 }, 

2696 }, 

2697 } 

2698 

2699 result = Ns._process_net_params( 

2700 target_vld=target_vld, 

2701 indata=indata, 

2702 vim_info=vim_info, 

2703 target_record_id=target_record_id, 

2704 ) 

2705 

2706 self.assertDictEqual(expected_result, result) 

2707 

2708 def test__process_net_params_with_mgmt_network( 

2709 self, 

2710 ): 

2711 target_vld = { 

2712 "id": "vld-id", 

2713 "name": "vld-name", 

2714 "mgmt-network": "some-mgmt-network", 

2715 } 

2716 indata = { 

2717 "name": "ns-name", 

2718 } 

2719 vim_info = {} 

2720 target_record_id = "vld.sdn.something" 

2721 expected_result = { 

2722 "find_params": { 

2723 "mgmt": True, 

2724 "name": "vld-id", 

2725 }, 

2726 } 

2727 

2728 result = Ns._process_net_params( 

2729 target_vld=target_vld, 

2730 indata=indata, 

2731 vim_info=vim_info, 

2732 target_record_id=target_record_id, 

2733 ) 

2734 

2735 self.assertDictEqual(expected_result, result) 

2736 

2737 def test__process_net_params_with_underlay_eline( 

2738 self, 

2739 ): 

2740 target_vld = { 

2741 "name": "vld-name", 

2742 "underlay": "some-underlay-here", 

2743 "type": "ELINE", 

2744 } 

2745 indata = { 

2746 "name": "ns-name", 

2747 } 

2748 vim_info = { 

2749 "provider_network": "some-profile-here", 

2750 "ip_profile": { 

2751 "some_ip_profile": "here", 

2752 }, 

2753 } 

2754 target_record_id = "" 

2755 expected_result = { 

2756 "params": { 

2757 "ip_profile": { 

2758 "some_ip_profile": "here", 

2759 }, 

2760 "net_name": "ns-name-vld-name", 

2761 "net_type": "ptp", 

2762 "provider_network_profile": "some-profile-here", 

2763 } 

2764 } 

2765 

2766 result = Ns._process_net_params( 

2767 target_vld=target_vld, 

2768 indata=indata, 

2769 vim_info=vim_info, 

2770 target_record_id=target_record_id, 

2771 ) 

2772 

2773 self.assertDictEqual(expected_result, result) 

2774 

2775 def test__process_net_params_with_underlay_elan( 

2776 self, 

2777 ): 

2778 target_vld = { 

2779 "name": "vld-name", 

2780 "underlay": "some-underlay-here", 

2781 "type": "ELAN", 

2782 } 

2783 indata = { 

2784 "name": "ns-name", 

2785 } 

2786 vim_info = { 

2787 "provider_network": "some-profile-here", 

2788 "ip_profile": { 

2789 "some_ip_profile": "here", 

2790 }, 

2791 } 

2792 target_record_id = "" 

2793 expected_result = { 

2794 "params": { 

2795 "ip_profile": { 

2796 "some_ip_profile": "here", 

2797 }, 

2798 "net_name": "ns-name-vld-name", 

2799 "net_type": "data", 

2800 "provider_network_profile": "some-profile-here", 

2801 } 

2802 } 

2803 

2804 result = Ns._process_net_params( 

2805 target_vld=target_vld, 

2806 indata=indata, 

2807 vim_info=vim_info, 

2808 target_record_id=target_record_id, 

2809 ) 

2810 

2811 self.assertDictEqual(expected_result, result) 

2812 

2813 def test__get_cloud_init_exception(self): 

2814 db_mock = MagicMock(name="database mock") 

2815 fs_mock = None 

2816 

2817 location = "" 

2818 

2819 with self.assertRaises(NsException): 

2820 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2821 

2822 def test__get_cloud_init_file_fs_exception(self): 

2823 db_mock = MagicMock(name="database mock") 

2824 fs_mock = None 

2825 

2826 location = "vnfr_id_123456:file:test_file" 

2827 db_mock.get_one.return_value = { 

2828 "_admin": { 

2829 "storage": { 

2830 "folder": "/home/osm", 

2831 "pkg-dir": "vnfr_test_dir", 

2832 }, 

2833 }, 

2834 } 

2835 

2836 with self.assertRaises(NsException): 

2837 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2838 

2839 def test__get_cloud_init_file(self): 

2840 db_mock = MagicMock(name="database mock") 

2841 fs_mock = MagicMock(name="filesystem mock") 

2842 file_mock = MagicMock(name="file mock") 

2843 

2844 location = "vnfr_id_123456:file:test_file" 

2845 cloud_init_content = "this is a cloud init file content" 

2846 

2847 db_mock.get_one.return_value = { 

2848 "_admin": { 

2849 "storage": { 

2850 "folder": "/home/osm", 

2851 "pkg-dir": "vnfr_test_dir", 

2852 }, 

2853 }, 

2854 } 

2855 fs_mock.file_open.return_value = file_mock 

2856 file_mock.__enter__.return_value.read.return_value = cloud_init_content 

2857 

2858 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2859 

2860 self.assertEqual(cloud_init_content, result) 

2861 

2862 def test__get_cloud_init_vdu(self): 

2863 db_mock = MagicMock(name="database mock") 

2864 fs_mock = None 

2865 

2866 location = "vnfr_id_123456:vdu:0" 

2867 cloud_init_content = "this is a cloud init file content" 

2868 

2869 db_mock.get_one.return_value = { 

2870 "vdu": { 

2871 0: { 

2872 "cloud-init": cloud_init_content, 

2873 }, 

2874 }, 

2875 } 

2876 

2877 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2878 

2879 self.assertEqual(cloud_init_content, result) 

2880 

2881 @patch("jinja2.Environment.__init__") 

2882 def test__parse_jinja2_undefined_error(self, env_mock: Mock): 

2883 cloud_init_content = None 

2884 params = None 

2885 context = None 

2886 

2887 env_mock.side_effect = UndefinedError("UndefinedError occurred.") 

2888 

2889 with self.assertRaises(NsException): 

2890 Ns._parse_jinja2( 

2891 cloud_init_content=cloud_init_content, params=params, context=context 

2892 ) 

2893 

2894 @patch("jinja2.Environment.__init__") 

2895 def test__parse_jinja2_template_error(self, env_mock: Mock): 

2896 cloud_init_content = None 

2897 params = None 

2898 context = None 

2899 

2900 env_mock.side_effect = TemplateError("TemplateError occurred.") 

2901 

2902 with self.assertRaises(NsException): 

2903 Ns._parse_jinja2( 

2904 cloud_init_content=cloud_init_content, params=params, context=context 

2905 ) 

2906 

2907 @patch("jinja2.Environment.__init__") 

2908 def test__parse_jinja2_template_not_found(self, env_mock: Mock): 

2909 cloud_init_content = None 

2910 params = None 

2911 context = None 

2912 

2913 env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.") 

2914 

2915 with self.assertRaises(NsException): 

2916 Ns._parse_jinja2( 

2917 cloud_init_content=cloud_init_content, params=params, context=context 

2918 ) 

2919 

2920 def test_rendering_jinja2_temp_without_special_characters(self): 

2921 cloud_init_content = """ 

2922 disk_setup: 

2923 ephemeral0: 

2924 table_type: {{type}} 

2925 layout: True 

2926 overwrite: {{is_override}} 

2927 runcmd: 

2928 - [ ls, -l, / ] 

2929 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

2930 """ 

2931 params = { 

2932 "type": "mbr", 

2933 "is_override": "False", 

2934 "command": "; mkdir abc", 

2935 } 

2936 context = "cloud-init for VM" 

2937 expected_result = """ 

2938 disk_setup: 

2939 ephemeral0: 

2940 table_type: mbr 

2941 layout: True 

2942 overwrite: False 

2943 runcmd: 

2944 - [ ls, -l, / ] 

2945 - [ sh, -xc, "echo $(date) '; mkdir abc'" ] 

2946 """ 

2947 result = Ns._parse_jinja2( 

2948 cloud_init_content=cloud_init_content, params=params, context=context 

2949 ) 

2950 self.assertEqual(result, expected_result) 

2951 

2952 def test_rendering_jinja2_temp_with_special_characters(self): 

2953 cloud_init_content = """ 

2954 disk_setup: 

2955 ephemeral0: 

2956 table_type: {{type}} 

2957 layout: True 

2958 overwrite: {{is_override}} 

2959 runcmd: 

2960 - [ ls, -l, / ] 

2961 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

2962 """ 

2963 params = { 

2964 "type": "mbr", 

2965 "is_override": "False", 

2966 "command": "& rm -rf", 

2967 } 

2968 context = "cloud-init for VM" 

2969 expected_result = """ 

2970 disk_setup: 

2971 ephemeral0: 

2972 table_type: mbr 

2973 layout: True 

2974 overwrite: False 

2975 runcmd: 

2976 - [ ls, -l, / ] 

2977 - [ sh, -xc, "echo $(date) '& rm -rf /'" ] 

2978 """ 

2979 result = Ns._parse_jinja2( 

2980 cloud_init_content=cloud_init_content, params=params, context=context 

2981 ) 

2982 self.assertNotEqual(result, expected_result) 

2983 

2984 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self): 

2985 with patch("osm_ng_ro.ns.Environment") as mock_environment: 

2986 mock_environment.return_value = Environment( 

2987 undefined=StrictUndefined, 

2988 autoescape=select_autoescape(default_for_string=False, default=False), 

2989 ) 

2990 cloud_init_content = """ 

2991 disk_setup: 

2992 ephemeral0: 

2993 table_type: {{type}} 

2994 layout: True 

2995 overwrite: {{is_override}} 

2996 runcmd: 

2997 - [ ls, -l, / ] 

2998 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

2999 """ 

3000 params = { 

3001 "type": "mbr", 

3002 "is_override": "False", 

3003 "command": "& rm -rf /", 

3004 } 

3005 context = "cloud-init for VM" 

3006 expected_result = """ 

3007 disk_setup: 

3008 ephemeral0: 

3009 table_type: mbr 

3010 layout: True 

3011 overwrite: False 

3012 runcmd: 

3013 - [ ls, -l, / ] 

3014 - [ sh, -xc, "echo $(date) '& rm -rf /'" ] 

3015 """ 

3016 result = Ns._parse_jinja2( 

3017 cloud_init_content=cloud_init_content, 

3018 params=params, 

3019 context=context, 

3020 ) 

3021 self.assertEqual(result, expected_result) 

3022 

3023 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3024 def test__rebuild_start_stop_task__successful(self, assign_vim): 

3025 self.ns = Ns() 

3026 extra_dict = {} 

3027 actions = ["start", "stop", "rebuild"] 

3028 vdu_index = "0" 

3029 task_index = 0 

3030 for action in actions: 

3031 params = { 

3032 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3033 "action": action, 

3034 } 

3035 extra_dict["params"] = params 

3036 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3037 expected_result[ 

3038 "target_record" 

3039 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3040 expected_result["params"] = params 

3041 task = self.ns.rebuild_start_stop_task( 

3042 vdu_id, 

3043 vnf_id, 

3044 vdu_index, 

3045 action_id, 

3046 nsr_id_2, 

3047 task_index, 

3048 target_vim, 

3049 extra_dict, 

3050 ) 

3051 self.assertDictEqual(task, expected_result) 

3052 

3053 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3054 def test__rebuild_start_stop_task__empty_extra_dict__task_without_params( 

3055 self, assign_vim 

3056 ): 

3057 self.ns = Ns() 

3058 extra_dict = {} 

3059 actions = ["start", "stop", "rebuild"] 

3060 vdu_index = "0" 

3061 task_index = 0 

3062 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3063 expected_result[ 

3064 "target_record" 

3065 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3066 for _ in actions: 

3067 task = self.ns.rebuild_start_stop_task( 

3068 vdu_id, 

3069 vnf_id, 

3070 vdu_index, 

3071 action_id, 

3072 nsr_id_2, 

3073 task_index, 

3074 target_vim, 

3075 extra_dict, 

3076 ) 

3077 self.assertDictEqual(task, expected_result) 

3078 

3079 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3080 def test__rebuild_start_stop_task__different_vdu_index__target_record_changes( 

3081 self, assign_vim 

3082 ): 

3083 self.ns = Ns() 

3084 extra_dict = {} 

3085 actions = ["start", "stop", "rebuild"] 

3086 vdu_index = "4" 

3087 task_index = 0 

3088 for action in actions: 

3089 params = { 

3090 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3091 "action": action, 

3092 } 

3093 extra_dict["params"] = params 

3094 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3095 expected_result[ 

3096 "target_record" 

3097 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.4.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3098 expected_result["params"] = params 

3099 task = self.ns.rebuild_start_stop_task( 

3100 vdu_id, 

3101 vnf_id, 

3102 vdu_index, 

3103 action_id, 

3104 nsr_id_2, 

3105 task_index, 

3106 target_vim, 

3107 extra_dict, 

3108 ) 

3109 self.assertDictEqual(task, expected_result) 

3110 

3111 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3112 def test__rebuild_start_stop_task__different_task_index__task_id_changes( 

3113 self, assign_vim 

3114 ): 

3115 self.ns = Ns() 

3116 extra_dict = {} 

3117 actions = ["start", "stop", "rebuild"] 

3118 vdu_index = "0" 

3119 task_index = 3 

3120 for action in actions: 

3121 params = { 

3122 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3123 "action": action, 

3124 } 

3125 extra_dict["params"] = params 

3126 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3127 expected_result[ 

3128 "target_record" 

3129 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3130 expected_result["params"] = params 

3131 expected_result["task_id"] = "bb937f49-3870-4169-b758-9732e1ff40f3:3" 

3132 task = self.ns.rebuild_start_stop_task( 

3133 vdu_id, 

3134 vnf_id, 

3135 vdu_index, 

3136 action_id, 

3137 nsr_id_2, 

3138 task_index, 

3139 target_vim, 

3140 extra_dict, 

3141 ) 

3142 self.assertDictEqual(task, expected_result) 

3143 

3144 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3145 def test__rebuild_start_stop_task__assign_vim_raises__task_is_not_created( 

3146 self, assign_vim 

3147 ): 

3148 self.ns = Ns() 

3149 extra_dict = {} 

3150 actions = ["start", "stop", "rebuild"] 

3151 vdu_index = "0" 

3152 task_index = 0 

3153 for action in actions: 

3154 params = { 

3155 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3156 "action": action, 

3157 } 

3158 extra_dict["params"] = params 

3159 assign_vim.side_effect = TestException("Can not connect to VIM.") 

3160 with self.assertRaises(TestException) as err: 

3161 task = self.ns.rebuild_start_stop_task( 

3162 vdu_id, 

3163 vnf_id, 

3164 vdu_index, 

3165 action_id, 

3166 nsr_id_2, 

3167 task_index, 

3168 target_vim, 

3169 extra_dict, 

3170 ) 

3171 self.assertEqual(task, None) 

3172 self.assertEqual(str(err.exception), "Can not connect to VIM.") 

3173 

3174 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3175 def test_verticalscale_task__successful(self, assign_vim): 

3176 self.ns = Ns() 

3177 vdu_index = "1" 

3178 task_index = 1 

3179 task = self.ns.verticalscale_task( 

3180 vdu, 

3181 vnf, 

3182 vdu_index, 

3183 action_id, 

3184 nsr_id_2, 

3185 task_index, 

3186 extra_dict_vertical_scale, 

3187 ) 

3188 self.assertDictEqual(task, expected_result_vertical_scale) 

3189 

3190 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3191 def test_verticalscale_task__task_index_changes__task_id_changes(self, assign_vim): 

3192 self.ns = Ns() 

3193 vdu_index = "1" 

3194 task_index = 2 

3195 expected_result = deepcopy(expected_result_vertical_scale) 

3196 expected_result["task_id"] = "bb937f49-3870-4169-b758-9732e1ff40f3:2" 

3197 task = self.ns.verticalscale_task( 

3198 vdu, 

3199 vnf, 

3200 vdu_index, 

3201 action_id, 

3202 nsr_id_2, 

3203 task_index, 

3204 extra_dict_vertical_scale, 

3205 ) 

3206 self.assertDictEqual(task, expected_result) 

3207 

3208 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3209 def test_verticalscale_task__empty_extra_dict__expected_result_without_params( 

3210 self, assign_vim 

3211 ): 

3212 self.ns = Ns() 

3213 extra_dict = {} 

3214 vdu_index = "1" 

3215 task_index = 1 

3216 expected_result = deepcopy(expected_result_vertical_scale) 

3217 expected_result.pop("params") 

3218 task = self.ns.verticalscale_task( 

3219 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict 

3220 ) 

3221 self.assertDictEqual(task, expected_result) 

3222 

3223 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3224 def test_verticalscale_task__assign_vim_raises__task_is_not_created( 

3225 self, assign_vim 

3226 ): 

3227 self.ns = Ns() 

3228 vdu_index = "1" 

3229 task_index = 1 

3230 assign_vim.side_effect = TestException("Can not connect to VIM.") 

3231 with self.assertRaises(TestException) as err: 

3232 task = self.ns.verticalscale_task( 

3233 vdu, 

3234 vnf, 

3235 vdu_index, 

3236 action_id, 

3237 nsr_id_2, 

3238 task_index, 

3239 extra_dict_vertical_scale, 

3240 ) 

3241 self.assertEqual(task, {}) 

3242 self.assertEqual(str(err.exception), "Can not connect to VIM.") 

3243 

3244 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3245 def test_migrate_task__successful(self, assign_vim): 

3246 self.ns = Ns() 

3247 vdu_index = "1" 

3248 task_index = 1 

3249 task = self.ns.migrate_task( 

3250 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict_migrate 

3251 ) 

3252 self.assertDictEqual(task, expected_result_migrate) 

3253 

3254 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3255 def test_migrate_task__empty_extra_dict__task_without_params(self, assign_vim): 

3256 self.ns = Ns() 

3257 extra_dict = {} 

3258 vdu_index = "1" 

3259 task_index = 1 

3260 expected_result = deepcopy(expected_result_migrate) 

3261 expected_result.pop("params") 

3262 task = self.ns.migrate_task( 

3263 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict 

3264 ) 

3265 self.assertDictEqual(task, expected_result) 

3266 

3267 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3268 def test_migrate_task__different_vdu_index__target_record_with_different_vdu_index( 

3269 self, assign_vim 

3270 ): 

3271 self.ns = Ns() 

3272 vdu_index = "4" 

3273 task_index = 1 

3274 expected_result = deepcopy(expected_result_migrate) 

3275 expected_result[ 

3276 "target_record" 

3277 ] = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.4.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3278 task = self.ns.migrate_task( 

3279 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict_migrate 

3280 ) 

3281 self.assertDictEqual(task, expected_result) 

3282 

3283 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3284 def test_migrate_task__assign_vim_raises__task_is_not_created(self, assign_vim): 

3285 self.ns = Ns() 

3286 vdu_index = "1" 

3287 task_index = 1 

3288 assign_vim.side_effect = TestException("Can not connect to VIM.") 

3289 with self.assertRaises(TestException) as err: 

3290 task = self.ns.migrate_task( 

3291 vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict_migrate 

3292 ) 

3293 self.assertDictEqual(task, {}) 

3294 self.assertEqual(str(err.exception), "Can not connect to VIM.") 

3295 

3296 

3297class TestProcessVduParams(unittest.TestCase): 

3298 def setUp(self): 

3299 self.ns = Ns() 

3300 self.logger = CopyingMock(autospec=True) 

3301 

3302 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3303 def test_find_persistent_root_volumes_empty_instantiation_vol_list( 

3304 self, mock_volume_keeping_required 

3305 ): 

3306 """Find persistent root volume, instantiation_vol_list is empty.""" 

3307 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3308 target_vdu = target_vdu_wth_persistent_storage 

3309 vdu_instantiation_volumes_list = [] 

3310 disk_list = [] 

3311 mock_volume_keeping_required.return_value = True 

3312 expected_root_disk = { 

3313 "id": "persistent-root-volume", 

3314 "type-of-storage": "persistent-storage:persistent-storage", 

3315 "size-of-storage": "10", 

3316 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}], 

3317 } 

3318 expected_persist_root_disk = { 

3319 "persistent-root-volume": { 

3320 "image_id": "ubuntu20.04", 

3321 "size": "10", 

3322 "keep": True, 

3323 } 

3324 } 

3325 expected_disk_list = [ 

3326 { 

3327 "image_id": "ubuntu20.04", 

3328 "size": "10", 

3329 "keep": True, 

3330 }, 

3331 ] 

3332 persist_root_disk = self.ns.find_persistent_root_volumes( 

3333 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3334 ) 

3335 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3336 mock_volume_keeping_required.assert_called_once_with(expected_root_disk) 

3337 self.assertEqual(disk_list, expected_disk_list) 

3338 self.assertEqual(len(disk_list), 1) 

3339 

3340 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3341 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root( 

3342 self, mock_volume_keeping_required 

3343 ): 

3344 """Find persistent root volume, always selects the first vsd as root volume.""" 

3345 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3346 vnfd["vdu"][0]["virtual-storage-desc"] = [ 

3347 "persistent-volume2", 

3348 "persistent-root-volume", 

3349 "ephemeral-volume", 

3350 ] 

3351 target_vdu = target_vdu_wth_persistent_storage 

3352 vdu_instantiation_volumes_list = [] 

3353 disk_list = [] 

3354 mock_volume_keeping_required.return_value = True 

3355 expected_root_disk = { 

3356 "id": "persistent-volume2", 

3357 "type-of-storage": "persistent-storage:persistent-storage", 

3358 "size-of-storage": "10", 

3359 } 

3360 expected_persist_root_disk = { 

3361 "persistent-volume2": { 

3362 "image_id": "ubuntu20.04", 

3363 "size": "10", 

3364 "keep": True, 

3365 } 

3366 } 

3367 expected_disk_list = [ 

3368 { 

3369 "image_id": "ubuntu20.04", 

3370 "size": "10", 

3371 "keep": True, 

3372 }, 

3373 ] 

3374 persist_root_disk = self.ns.find_persistent_root_volumes( 

3375 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3376 ) 

3377 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3378 mock_volume_keeping_required.assert_called_once_with(expected_root_disk) 

3379 self.assertEqual(disk_list, expected_disk_list) 

3380 self.assertEqual(len(disk_list), 1) 

3381 

3382 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3383 def test_find_persistent_root_volumes_empty_size_of_storage( 

3384 self, mock_volume_keeping_required 

3385 ): 

3386 """Find persistent root volume, size of storage is empty.""" 

3387 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3388 vnfd["virtual-storage-desc"][0]["size-of-storage"] = "" 

3389 vnfd["vdu"][0]["virtual-storage-desc"] = [ 

3390 "persistent-volume2", 

3391 "persistent-root-volume", 

3392 "ephemeral-volume", 

3393 ] 

3394 target_vdu = target_vdu_wth_persistent_storage 

3395 vdu_instantiation_volumes_list = [] 

3396 disk_list = [] 

3397 persist_root_disk = self.ns.find_persistent_root_volumes( 

3398 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3399 ) 

3400 self.assertEqual(persist_root_disk, {}) 

3401 mock_volume_keeping_required.assert_not_called() 

3402 self.assertEqual(disk_list, []) 

3403 

3404 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3405 def test_find_persistent_root_volumes_keeping_is_not_required( 

3406 self, mock_volume_keeping_required 

3407 ): 

3408 """Find persistent root volume, volume keeping is not required.""" 

3409 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3410 vnfd["virtual-storage-desc"][1]["vdu-storage-requirements"] = [ 

3411 {"key": "keep-volume", "value": "false"}, 

3412 ] 

3413 target_vdu = target_vdu_wth_persistent_storage 

3414 vdu_instantiation_volumes_list = [] 

3415 disk_list = [] 

3416 mock_volume_keeping_required.return_value = False 

3417 expected_root_disk = { 

3418 "id": "persistent-root-volume", 

3419 "type-of-storage": "persistent-storage:persistent-storage", 

3420 "size-of-storage": "10", 

3421 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}], 

3422 } 

3423 expected_persist_root_disk = { 

3424 "persistent-root-volume": { 

3425 "image_id": "ubuntu20.04", 

3426 "size": "10", 

3427 "keep": False, 

3428 } 

3429 } 

3430 expected_disk_list = [ 

3431 { 

3432 "image_id": "ubuntu20.04", 

3433 "size": "10", 

3434 "keep": False, 

3435 }, 

3436 ] 

3437 persist_root_disk = self.ns.find_persistent_root_volumes( 

3438 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3439 ) 

3440 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3441 mock_volume_keeping_required.assert_called_once_with(expected_root_disk) 

3442 self.assertEqual(disk_list, expected_disk_list) 

3443 self.assertEqual(len(disk_list), 1) 

3444 

3445 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3446 def test_find_persistent_root_volumes_target_vdu_mismatch( 

3447 self, mock_volume_keeping_required 

3448 ): 

3449 """Find persistent root volume, target vdu name is not matching.""" 

3450 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3451 vnfd["vdu"][0]["name"] = "Several_Volumes-VM" 

3452 target_vdu = target_vdu_wth_persistent_storage 

3453 vdu_instantiation_volumes_list = [] 

3454 disk_list = [] 

3455 result = self.ns.find_persistent_root_volumes( 

3456 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3457 ) 

3458 self.assertEqual(result, None) 

3459 mock_volume_keeping_required.assert_not_called() 

3460 self.assertEqual(disk_list, []) 

3461 self.assertEqual(len(disk_list), 0) 

3462 

3463 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3464 def test_find_persistent_root_volumes_with_instantiation_vol_list( 

3465 self, mock_volume_keeping_required 

3466 ): 

3467 """Find persistent root volume, existing volume needs to be used.""" 

3468 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3469 target_vdu = target_vdu_wth_persistent_storage 

3470 vdu_instantiation_volumes_list = [ 

3471 { 

3472 "vim-volume-id": vim_volume_id, 

3473 "name": "persistent-root-volume", 

3474 } 

3475 ] 

3476 disk_list = [] 

3477 expected_persist_root_disk = { 

3478 "persistent-root-volume": { 

3479 "vim_volume_id": vim_volume_id, 

3480 "image_id": "ubuntu20.04", 

3481 }, 

3482 } 

3483 expected_disk_list = [ 

3484 { 

3485 "vim_volume_id": vim_volume_id, 

3486 "image_id": "ubuntu20.04", 

3487 }, 

3488 ] 

3489 persist_root_disk = self.ns.find_persistent_root_volumes( 

3490 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3491 ) 

3492 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3493 mock_volume_keeping_required.assert_not_called() 

3494 self.assertEqual(disk_list, expected_disk_list) 

3495 self.assertEqual(len(disk_list), 1) 

3496 

3497 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3498 def test_find_persistent_root_volumes_invalid_instantiation_params( 

3499 self, mock_volume_keeping_required 

3500 ): 

3501 """Find persistent root volume, existing volume id keyword is invalid.""" 

3502 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3503 target_vdu = target_vdu_wth_persistent_storage 

3504 vdu_instantiation_volumes_list = [ 

3505 { 

3506 "volume-id": vim_volume_id, 

3507 "name": "persistent-root-volume", 

3508 } 

3509 ] 

3510 disk_list = [] 

3511 with self.assertRaises(KeyError): 

3512 self.ns.find_persistent_root_volumes( 

3513 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3514 ) 

3515 mock_volume_keeping_required.assert_not_called() 

3516 self.assertEqual(disk_list, []) 

3517 self.assertEqual(len(disk_list), 0) 

3518 

3519 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3520 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list( 

3521 self, mock_volume_keeping_required 

3522 ): 

3523 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty.""" 

3524 persistent_root_disk = { 

3525 "persistent-root-volume": { 

3526 "image_id": "ubuntu20.04", 

3527 "size": "10", 

3528 "keep": False, 

3529 } 

3530 } 

3531 mock_volume_keeping_required.return_value = False 

3532 target_vdu = target_vdu_wth_persistent_storage 

3533 vdu_instantiation_volumes_list = [] 

3534 disk_list = [ 

3535 { 

3536 "image_id": "ubuntu20.04", 

3537 "size": "10", 

3538 "keep": False, 

3539 }, 

3540 ] 

3541 expected_disk = { 

3542 "id": "persistent-volume2", 

3543 "size-of-storage": "10", 

3544 "type-of-storage": "persistent-storage:persistent-storage", 

3545 } 

3546 expected_disk_list = [ 

3547 { 

3548 "image_id": "ubuntu20.04", 

3549 "size": "10", 

3550 "keep": False, 

3551 }, 

3552 { 

3553 "size": "10", 

3554 "keep": False, 

3555 }, 

3556 ] 

3557 self.ns.find_persistent_volumes( 

3558 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3559 ) 

3560 self.assertEqual(disk_list, expected_disk_list) 

3561 mock_volume_keeping_required.assert_called_once_with(expected_disk) 

3562 

3563 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3564 def test_find_persistent_volumes_vdu_wth_inst_vol_list( 

3565 self, mock_volume_keeping_required 

3566 ): 

3567 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter.""" 

3568 persistent_root_disk = { 

3569 "persistent-root-volume": { 

3570 "image_id": "ubuntu20.04", 

3571 "size": "10", 

3572 "keep": False, 

3573 } 

3574 } 

3575 vdu_instantiation_volumes_list = [ 

3576 { 

3577 "vim-volume-id": vim_volume_id, 

3578 "name": "persistent-volume2", 

3579 } 

3580 ] 

3581 target_vdu = target_vdu_wth_persistent_storage 

3582 disk_list = [ 

3583 { 

3584 "image_id": "ubuntu20.04", 

3585 "size": "10", 

3586 "keep": False, 

3587 }, 

3588 ] 

3589 expected_disk_list = [ 

3590 { 

3591 "image_id": "ubuntu20.04", 

3592 "size": "10", 

3593 "keep": False, 

3594 }, 

3595 { 

3596 "vim_volume_id": vim_volume_id, 

3597 }, 

3598 ] 

3599 self.ns.find_persistent_volumes( 

3600 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3601 ) 

3602 self.assertEqual(disk_list, expected_disk_list) 

3603 mock_volume_keeping_required.assert_not_called() 

3604 

3605 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3606 def test_find_persistent_volumes_vdu_wthout_persistent_storage( 

3607 self, mock_volume_keeping_required 

3608 ): 

3609 """Find persistent ordinary volume, there is not any persistent disk.""" 

3610 persistent_root_disk = {} 

3611 vdu_instantiation_volumes_list = [] 

3612 mock_volume_keeping_required.return_value = False 

3613 target_vdu = target_vdu_wthout_persistent_storage 

3614 disk_list = [] 

3615 self.ns.find_persistent_volumes( 

3616 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3617 ) 

3618 self.assertEqual(disk_list, disk_list) 

3619 mock_volume_keeping_required.assert_not_called() 

3620 

3621 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3622 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk( 

3623 self, mock_volume_keeping_required 

3624 ): 

3625 """There is persistent root disk, but there is not ordinary persistent disk.""" 

3626 persistent_root_disk = { 

3627 "persistent-root-volume": { 

3628 "image_id": "ubuntu20.04", 

3629 "size": "10", 

3630 "keep": False, 

3631 } 

3632 } 

3633 vdu_instantiation_volumes_list = [] 

3634 mock_volume_keeping_required.return_value = False 

3635 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3636 target_vdu["virtual-storages"] = [ 

3637 { 

3638 "id": "persistent-root-volume", 

3639 "size-of-storage": "10", 

3640 "type-of-storage": "persistent-storage:persistent-storage", 

3641 "vdu-storage-requirements": [ 

3642 {"key": "keep-volume", "value": "true"}, 

3643 ], 

3644 }, 

3645 { 

3646 "id": "ephemeral-volume", 

3647 "size-of-storage": "1", 

3648 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

3649 }, 

3650 ] 

3651 disk_list = [ 

3652 { 

3653 "image_id": "ubuntu20.04", 

3654 "size": "10", 

3655 "keep": False, 

3656 }, 

3657 ] 

3658 self.ns.find_persistent_volumes( 

3659 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3660 ) 

3661 self.assertEqual(disk_list, disk_list) 

3662 mock_volume_keeping_required.assert_not_called() 

3663 

3664 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3665 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch( 

3666 self, mock_volume_keeping_required 

3667 ): 

3668 """Find persistent ordinary volume, volume id is not persistent_root_disk dict, 

3669 vim-volume-id is given as instantiation parameter but disk id is not matching. 

3670 """ 

3671 mock_volume_keeping_required.return_value = True 

3672 vdu_instantiation_volumes_list = [ 

3673 { 

3674 "vim-volume-id": vim_volume_id, 

3675 "name": "persistent-volume3", 

3676 } 

3677 ] 

3678 persistent_root_disk = { 

3679 "persistent-root-volume": { 

3680 "image_id": "ubuntu20.04", 

3681 "size": "10", 

3682 "keep": False, 

3683 } 

3684 } 

3685 disk_list = [ 

3686 { 

3687 "image_id": "ubuntu20.04", 

3688 "size": "10", 

3689 "keep": False, 

3690 }, 

3691 ] 

3692 expected_disk_list = [ 

3693 { 

3694 "image_id": "ubuntu20.04", 

3695 "size": "10", 

3696 "keep": False, 

3697 }, 

3698 { 

3699 "size": "10", 

3700 "keep": True, 

3701 }, 

3702 ] 

3703 expected_disk = { 

3704 "id": "persistent-volume2", 

3705 "size-of-storage": "10", 

3706 "type-of-storage": "persistent-storage:persistent-storage", 

3707 } 

3708 target_vdu = target_vdu_wth_persistent_storage 

3709 self.ns.find_persistent_volumes( 

3710 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3711 ) 

3712 self.assertEqual(disk_list, expected_disk_list) 

3713 mock_volume_keeping_required.assert_called_once_with(expected_disk) 

3714 

3715 def test_is_volume_keeping_required_true(self): 

3716 """Volume keeping is required.""" 

3717 virtual_storage_descriptor = { 

3718 "id": "persistent-root-volume", 

3719 "type-of-storage": "persistent-storage:persistent-storage", 

3720 "size-of-storage": "10", 

3721 "vdu-storage-requirements": [ 

3722 {"key": "keep-volume", "value": "true"}, 

3723 ], 

3724 } 

3725 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3726 self.assertEqual(result, True) 

3727 

3728 def test_is_volume_keeping_required_false(self): 

3729 """Volume keeping is not required.""" 

3730 virtual_storage_descriptor = { 

3731 "id": "persistent-root-volume", 

3732 "type-of-storage": "persistent-storage:persistent-storage", 

3733 "size-of-storage": "10", 

3734 "vdu-storage-requirements": [ 

3735 {"key": "keep-volume", "value": "false"}, 

3736 ], 

3737 } 

3738 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3739 self.assertEqual(result, False) 

3740 

3741 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self): 

3742 """Volume keeping is not required, vdu-storage-requirements key does not exist.""" 

3743 virtual_storage_descriptor = { 

3744 "id": "persistent-root-volume", 

3745 "type-of-storage": "persistent-storage:persistent-storage", 

3746 "size-of-storage": "10", 

3747 } 

3748 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3749 self.assertEqual(result, False) 

3750 

3751 def test_is_volume_keeping_required_wrong_keyword(self): 

3752 """vdu-storage-requirements key to indicate keeping-volume is wrong.""" 

3753 virtual_storage_descriptor = { 

3754 "id": "persistent-root-volume", 

3755 "type-of-storage": "persistent-storage:persistent-storage", 

3756 "size-of-storage": "10", 

3757 "vdu-storage-requirements": [ 

3758 {"key": "hold-volume", "value": "true"}, 

3759 ], 

3760 } 

3761 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3762 self.assertEqual(result, False) 

3763 

3764 def test_sort_vdu_interfaces_position_all_wth_positions(self): 

3765 """Interfaces are sorted according to position, all have positions.""" 

3766 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3767 target_vdu["interfaces"] = [ 

3768 { 

3769 "name": "vdu-eth1", 

3770 "ns-vld-id": "datanet", 

3771 "position": 2, 

3772 }, 

3773 { 

3774 "name": "vdu-eth0", 

3775 "ns-vld-id": "mgmtnet", 

3776 "position": 1, 

3777 }, 

3778 ] 

3779 sorted_interfaces = [ 

3780 { 

3781 "name": "vdu-eth0", 

3782 "ns-vld-id": "mgmtnet", 

3783 "position": 1, 

3784 }, 

3785 { 

3786 "name": "vdu-eth1", 

3787 "ns-vld-id": "datanet", 

3788 "position": 2, 

3789 }, 

3790 ] 

3791 self.ns._sort_vdu_interfaces(target_vdu) 

3792 self.assertEqual(target_vdu["interfaces"], sorted_interfaces) 

3793 

3794 def test_sort_vdu_interfaces_position_some_wth_position(self): 

3795 """Interfaces are sorted according to position, some of them have positions.""" 

3796 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3797 target_vdu["interfaces"] = [ 

3798 { 

3799 "name": "vdu-eth0", 

3800 "ns-vld-id": "mgmtnet", 

3801 }, 

3802 { 

3803 "name": "vdu-eth1", 

3804 "ns-vld-id": "datanet", 

3805 "position": 1, 

3806 }, 

3807 ] 

3808 sorted_interfaces = [ 

3809 { 

3810 "name": "vdu-eth1", 

3811 "ns-vld-id": "datanet", 

3812 "position": 1, 

3813 }, 

3814 { 

3815 "name": "vdu-eth0", 

3816 "ns-vld-id": "mgmtnet", 

3817 }, 

3818 ] 

3819 self.ns._sort_vdu_interfaces(target_vdu) 

3820 self.assertEqual(target_vdu["interfaces"], sorted_interfaces) 

3821 

3822 def test_sort_vdu_interfaces_position_empty_interface_list(self): 

3823 """Interface list is empty.""" 

3824 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3825 target_vdu["interfaces"] = [] 

3826 sorted_interfaces = [] 

3827 self.ns._sort_vdu_interfaces(target_vdu) 

3828 self.assertEqual(target_vdu["interfaces"], sorted_interfaces) 

3829 

3830 def test_partially_locate_vdu_interfaces(self): 

3831 """Some interfaces have positions.""" 

3832 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3833 target_vdu["interfaces"] = [ 

3834 { 

3835 "name": "vdu-eth1", 

3836 "ns-vld-id": "net1", 

3837 }, 

3838 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3839 { 

3840 "name": "vdu-eth3", 

3841 "ns-vld-id": "mgmtnet", 

3842 }, 

3843 { 

3844 "name": "vdu-eth1", 

3845 "ns-vld-id": "datanet", 

3846 "position": 1, 

3847 }, 

3848 ] 

3849 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3850 self.assertDictEqual( 

3851 target_vdu["interfaces"][0], 

3852 { 

3853 "name": "vdu-eth1", 

3854 "ns-vld-id": "datanet", 

3855 "position": 1, 

3856 }, 

3857 ) 

3858 self.assertDictEqual( 

3859 target_vdu["interfaces"][2], 

3860 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3861 ) 

3862 

3863 def test_partially_locate_vdu_interfaces_position_start_from_0(self): 

3864 """Some interfaces have positions, position start from 0.""" 

3865 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3866 target_vdu["interfaces"] = [ 

3867 { 

3868 "name": "vdu-eth1", 

3869 "ns-vld-id": "net1", 

3870 }, 

3871 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3872 { 

3873 "name": "vdu-eth3", 

3874 "ns-vld-id": "mgmtnet", 

3875 }, 

3876 { 

3877 "name": "vdu-eth1", 

3878 "ns-vld-id": "datanet", 

3879 "position": 0, 

3880 }, 

3881 ] 

3882 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3883 self.assertDictEqual( 

3884 target_vdu["interfaces"][0], 

3885 { 

3886 "name": "vdu-eth1", 

3887 "ns-vld-id": "datanet", 

3888 "position": 0, 

3889 }, 

3890 ) 

3891 self.assertDictEqual( 

3892 target_vdu["interfaces"][3], 

3893 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3894 ) 

3895 

3896 def test_partially_locate_vdu_interfaces_wthout_position(self): 

3897 """Interfaces do not have positions.""" 

3898 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3899 target_vdu["interfaces"] = interfaces_wthout_positions 

3900 expected_result = deepcopy(target_vdu["interfaces"]) 

3901 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3902 self.assertEqual(target_vdu["interfaces"], expected_result) 

3903 

3904 def test_partially_locate_vdu_interfaces_all_has_position(self): 

3905 """All interfaces have position.""" 

3906 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3907 target_vdu["interfaces"] = interfaces_wth_all_positions 

3908 expected_interfaces = [ 

3909 { 

3910 "name": "vdu-eth2", 

3911 "ns-vld-id": "net2", 

3912 "position": 0, 

3913 }, 

3914 { 

3915 "name": "vdu-eth3", 

3916 "ns-vld-id": "mgmtnet", 

3917 "position": 1, 

3918 }, 

3919 { 

3920 "name": "vdu-eth1", 

3921 "ns-vld-id": "net1", 

3922 "position": 2, 

3923 }, 

3924 ] 

3925 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3926 self.assertEqual(target_vdu["interfaces"], expected_interfaces) 

3927 

3928 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

3929 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

3930 def test_prepare_vdu_cloud_init(self, mock_parse_jinja2, mock_get_cloud_init): 

3931 """Target_vdu has cloud-init and boot-data-drive.""" 

3932 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3933 target_vdu["cloud-init"] = "sample-cloud-init-path" 

3934 target_vdu["boot-data-drive"] = "vda" 

3935 vdu2cloud_init = {} 

3936 mock_get_cloud_init.return_value = cloud_init_content 

3937 mock_parse_jinja2.return_value = user_data 

3938 expected_result = { 

3939 "user-data": user_data, 

3940 "boot-data-drive": "vda", 

3941 } 

3942 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

3943 self.assertDictEqual(result, expected_result) 

3944 mock_get_cloud_init.assert_called_once_with( 

3945 db=db, fs=fs, location="sample-cloud-init-path" 

3946 ) 

3947 mock_parse_jinja2.assert_called_once_with( 

3948 cloud_init_content=cloud_init_content, 

3949 params=None, 

3950 context="sample-cloud-init-path", 

3951 ) 

3952 

3953 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

3954 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

3955 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception( 

3956 self, mock_parse_jinja2, mock_get_cloud_init 

3957 ): 

3958 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception.""" 

3959 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3960 target_vdu["cloud-init"] = "sample-cloud-init-path" 

3961 target_vdu["boot-data-drive"] = "vda" 

3962 vdu2cloud_init = {} 

3963 mock_get_cloud_init.side_effect = NsException( 

3964 "Mismatch descriptor for cloud init." 

3965 ) 

3966 

3967 with self.assertRaises(NsException) as err: 

3968 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

3969 self.assertEqual(str(err.exception), "Mismatch descriptor for cloud init.") 

3970 

3971 mock_get_cloud_init.assert_called_once_with( 

3972 db=db, fs=fs, location="sample-cloud-init-path" 

3973 ) 

3974 mock_parse_jinja2.assert_not_called() 

3975 

3976 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

3977 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

3978 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception( 

3979 self, mock_parse_jinja2, mock_get_cloud_init 

3980 ): 

3981 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception.""" 

3982 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3983 target_vdu["cloud-init"] = "sample-cloud-init-path" 

3984 target_vdu["boot-data-drive"] = "vda" 

3985 vdu2cloud_init = {} 

3986 mock_get_cloud_init.return_value = cloud_init_content 

3987 mock_parse_jinja2.side_effect = NsException("Error parsing cloud-init content.") 

3988 

3989 with self.assertRaises(NsException) as err: 

3990 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

3991 self.assertEqual(str(err.exception), "Error parsing cloud-init content.") 

3992 mock_get_cloud_init.assert_called_once_with( 

3993 db=db, fs=fs, location="sample-cloud-init-path" 

3994 ) 

3995 mock_parse_jinja2.assert_called_once_with( 

3996 cloud_init_content=cloud_init_content, 

3997 params=None, 

3998 context="sample-cloud-init-path", 

3999 ) 

4000 

4001 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

4002 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

4003 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive( 

4004 self, mock_parse_jinja2, mock_get_cloud_init 

4005 ): 

4006 """Target_vdu has cloud-init but do not have boot-data-drive.""" 

4007 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4008 target_vdu["cloud-init"] = "sample-cloud-init-path" 

4009 vdu2cloud_init = {} 

4010 mock_get_cloud_init.return_value = cloud_init_content 

4011 mock_parse_jinja2.return_value = user_data 

4012 expected_result = { 

4013 "user-data": user_data, 

4014 } 

4015 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4016 self.assertDictEqual(result, expected_result) 

4017 mock_get_cloud_init.assert_called_once_with( 

4018 db=db, fs=fs, location="sample-cloud-init-path" 

4019 ) 

4020 mock_parse_jinja2.assert_called_once_with( 

4021 cloud_init_content=cloud_init_content, 

4022 params=None, 

4023 context="sample-cloud-init-path", 

4024 ) 

4025 

4026 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

4027 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

4028 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init( 

4029 self, mock_parse_jinja2, mock_get_cloud_init 

4030 ): 

4031 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content.""" 

4032 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4033 target_vdu["cloud-init"] = "sample-cloud-init-path" 

4034 target_vdu["boot-data-drive"] = "vda" 

4035 vdu2cloud_init = {"sample-cloud-init-path": cloud_init_content} 

4036 mock_parse_jinja2.return_value = user_data 

4037 expected_result = { 

4038 "user-data": user_data, 

4039 "boot-data-drive": "vda", 

4040 } 

4041 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4042 self.assertDictEqual(result, expected_result) 

4043 mock_get_cloud_init.assert_not_called() 

4044 mock_parse_jinja2.assert_called_once_with( 

4045 cloud_init_content=cloud_init_content, 

4046 params=None, 

4047 context="sample-cloud-init-path", 

4048 ) 

4049 

4050 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

4051 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

4052 def test_prepare_vdu_cloud_init_no_cloud_init( 

4053 self, mock_parse_jinja2, mock_get_cloud_init 

4054 ): 

4055 """Target_vdu do not have cloud-init.""" 

4056 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4057 target_vdu["boot-data-drive"] = "vda" 

4058 vdu2cloud_init = {} 

4059 expected_result = { 

4060 "boot-data-drive": "vda", 

4061 } 

4062 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4063 self.assertDictEqual(result, expected_result) 

4064 mock_get_cloud_init.assert_not_called() 

4065 mock_parse_jinja2.assert_not_called() 

4066 

4067 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self): 

4068 """ns_vld and vnf_vld both exist.""" 

4069 interface = { 

4070 "name": "vdu-eth0", 

4071 "ns-vld-id": "mgmtnet", 

4072 "vnf-vld-id": "mgmt_cp_int", 

4073 } 

4074 expected_result = f"{ns_preffix}:vld.mgmtnet" 

4075 result = self.ns._check_vld_information_of_interfaces( 

4076 interface, ns_preffix, vnf_preffix 

4077 ) 

4078 self.assertEqual(result, expected_result) 

4079 

4080 def test_check_vld_information_of_interfaces_empty_interfaces(self): 

4081 """Interface dict is empty.""" 

4082 interface = {} 

4083 result = self.ns._check_vld_information_of_interfaces( 

4084 interface, ns_preffix, vnf_preffix 

4085 ) 

4086 self.assertEqual(result, "") 

4087 

4088 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self): 

4089 """Interface dict has only vnf_vld.""" 

4090 interface = { 

4091 "name": "vdu-eth0", 

4092 "vnf-vld-id": "mgmt_cp_int", 

4093 } 

4094 expected_result = f"{vnf_preffix}:vld.mgmt_cp_int" 

4095 result = self.ns._check_vld_information_of_interfaces( 

4096 interface, ns_preffix, vnf_preffix 

4097 ) 

4098 self.assertEqual(result, expected_result) 

4099 

4100 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix( 

4101 self, 

4102 ): 

4103 """Interface dict has only vnf_vld but vnf_preffix does not exist.""" 

4104 interface = { 

4105 "name": "vdu-eth0", 

4106 "vnf-vld-id": "mgmt_cp_int", 

4107 } 

4108 vnf_preffix = None 

4109 with self.assertRaises(Exception) as err: 

4110 self.ns._check_vld_information_of_interfaces( 

4111 interface, ns_preffix, vnf_preffix 

4112 ) 

4113 self.assertEqual(type(err), TypeError) 

4114 

4115 def test_prepare_interface_port_security_has_security_details(self): 

4116 """Interface dict has port security details.""" 

4117 interface = { 

4118 "name": "vdu-eth0", 

4119 "ns-vld-id": "mgmtnet", 

4120 "vnf-vld-id": "mgmt_cp_int", 

4121 "port-security-enabled": True, 

4122 "port-security-disable-strategy": "allow-address-pairs", 

4123 } 

4124 expected_interface = { 

4125 "name": "vdu-eth0", 

4126 "ns-vld-id": "mgmtnet", 

4127 "vnf-vld-id": "mgmt_cp_int", 

4128 "port_security": True, 

4129 "port_security_disable_strategy": "allow-address-pairs", 

4130 } 

4131 self.ns._prepare_interface_port_security(interface) 

4132 self.assertDictEqual(interface, expected_interface) 

4133 

4134 def test_prepare_interface_port_security_empty_interfaces(self): 

4135 """Interface dict is empty.""" 

4136 interface = {} 

4137 expected_interface = {} 

4138 self.ns._prepare_interface_port_security(interface) 

4139 self.assertDictEqual(interface, expected_interface) 

4140 

4141 def test_prepare_interface_port_security_wthout_port_security(self): 

4142 """Interface dict does not have port security details.""" 

4143 interface = { 

4144 "name": "vdu-eth0", 

4145 "ns-vld-id": "mgmtnet", 

4146 "vnf-vld-id": "mgmt_cp_int", 

4147 } 

4148 expected_interface = { 

4149 "name": "vdu-eth0", 

4150 "ns-vld-id": "mgmtnet", 

4151 "vnf-vld-id": "mgmt_cp_int", 

4152 } 

4153 self.ns._prepare_interface_port_security(interface) 

4154 self.assertDictEqual(interface, expected_interface) 

4155 

4156 def test_create_net_item_of_interface_floating_ip_port_security(self): 

4157 """Interface dict has floating ip, port-security details.""" 

4158 interface = { 

4159 "name": "vdu-eth0", 

4160 "vcpi": "sample_vcpi", 

4161 "port_security": True, 

4162 "port_security_disable_strategy": "allow-address-pairs", 

4163 "floating_ip": "10.1.1.12", 

4164 "ns-vld-id": "mgmtnet", 

4165 "vnf-vld-id": "mgmt_cp_int", 

4166 } 

4167 net_text = f"{ns_preffix}" 

4168 expected_net_item = { 

4169 "name": "vdu-eth0", 

4170 "port_security": True, 

4171 "port_security_disable_strategy": "allow-address-pairs", 

4172 "floating_ip": "10.1.1.12", 

4173 "net_id": f"TASK-{ns_preffix}", 

4174 "type": "virtual", 

4175 } 

4176 result = self.ns._create_net_item_of_interface(interface, net_text) 

4177 self.assertDictEqual(result, expected_net_item) 

4178 

4179 def test_create_net_item_of_interface_invalid_net_text(self): 

4180 """net-text is invalid.""" 

4181 interface = { 

4182 "name": "vdu-eth0", 

4183 "vcpi": "sample_vcpi", 

4184 "port_security": True, 

4185 "port_security_disable_strategy": "allow-address-pairs", 

4186 "floating_ip": "10.1.1.12", 

4187 "ns-vld-id": "mgmtnet", 

4188 "vnf-vld-id": "mgmt_cp_int", 

4189 } 

4190 net_text = None 

4191 with self.assertRaises(TypeError): 

4192 self.ns._create_net_item_of_interface(interface, net_text) 

4193 

4194 def test_create_net_item_of_interface_empty_interface(self): 

4195 """Interface dict is empty.""" 

4196 interface = {} 

4197 net_text = ns_preffix 

4198 expected_net_item = { 

4199 "net_id": f"TASK-{ns_preffix}", 

4200 "type": "virtual", 

4201 } 

4202 result = self.ns._create_net_item_of_interface(interface, net_text) 

4203 self.assertDictEqual(result, expected_net_item) 

4204 

4205 @patch("osm_ng_ro.ns.deep_get") 

4206 def test_prepare_type_of_interface_type_sriov(self, mock_deep_get): 

4207 """Interface type is SR-IOV.""" 

4208 interface = { 

4209 "name": "vdu-eth0", 

4210 "vcpi": "sample_vcpi", 

4211 "port_security": True, 

4212 "port_security_disable_strategy": "allow-address-pairs", 

4213 "floating_ip": "10.1.1.12", 

4214 "ns-vld-id": "mgmtnet", 

4215 "vnf-vld-id": "mgmt_cp_int", 

4216 "type": "SR-IOV", 

4217 } 

4218 mock_deep_get.return_value = "SR-IOV" 

4219 net_text = ns_preffix 

4220 net_item = {} 

4221 expected_net_item = { 

4222 "use": "data", 

4223 "model": "SR-IOV", 

4224 "type": "SR-IOV", 

4225 } 

4226 self.ns._prepare_type_of_interface( 

4227 interface, tasks_by_target_record_id, net_text, net_item 

4228 ) 

4229 self.assertDictEqual(net_item, expected_net_item) 

4230 self.assertEqual( 

4231 "data", 

4232 tasks_by_target_record_id[net_text]["extra_dict"]["params"]["net_type"], 

4233 ) 

4234 mock_deep_get.assert_called_once_with( 

4235 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type" 

4236 ) 

4237 

4238 @patch("osm_ng_ro.ns.deep_get") 

4239 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict( 

4240 self, mock_deep_get 

4241 ): 

4242 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict.""" 

4243 interface = { 

4244 "name": "vdu-eth0", 

4245 "vcpi": "sample_vcpi", 

4246 "port_security": True, 

4247 "port_security_disable_strategy": "allow-address-pairs", 

4248 "floating_ip": "10.1.1.12", 

4249 "ns-vld-id": "mgmtnet", 

4250 "vnf-vld-id": "mgmt_cp_int", 

4251 "type": "PCI-PASSTHROUGH", 

4252 } 

4253 mock_deep_get.return_value = {} 

4254 tasks_by_target_record_id = {} 

4255 net_text = ns_preffix 

4256 net_item = {} 

4257 expected_net_item = { 

4258 "use": "data", 

4259 "model": "PCI-PASSTHROUGH", 

4260 "type": "PCI-PASSTHROUGH", 

4261 } 

4262 self.ns._prepare_type_of_interface( 

4263 interface, tasks_by_target_record_id, net_text, net_item 

4264 ) 

4265 self.assertDictEqual(net_item, expected_net_item) 

4266 mock_deep_get.assert_called_once_with( 

4267 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type" 

4268 ) 

4269 

4270 @patch("osm_ng_ro.ns.deep_get") 

4271 def test_prepare_type_of_interface_type_mgmt(self, mock_deep_get): 

4272 """Interface type is mgmt.""" 

4273 interface = { 

4274 "name": "vdu-eth0", 

4275 "vcpi": "sample_vcpi", 

4276 "port_security": True, 

4277 "port_security_disable_strategy": "allow-address-pairs", 

4278 "floating_ip": "10.1.1.12", 

4279 "ns-vld-id": "mgmtnet", 

4280 "vnf-vld-id": "mgmt_cp_int", 

4281 "type": "OM-MGMT", 

4282 } 

4283 tasks_by_target_record_id = {} 

4284 net_text = ns_preffix 

4285 net_item = {} 

4286 expected_net_item = { 

4287 "use": "mgmt", 

4288 } 

4289 self.ns._prepare_type_of_interface( 

4290 interface, tasks_by_target_record_id, net_text, net_item 

4291 ) 

4292 self.assertDictEqual(net_item, expected_net_item) 

4293 mock_deep_get.assert_not_called() 

4294 

4295 @patch("osm_ng_ro.ns.deep_get") 

4296 def test_prepare_type_of_interface_type_bridge(self, mock_deep_get): 

4297 """Interface type is bridge.""" 

4298 interface = { 

4299 "name": "vdu-eth0", 

4300 "vcpi": "sample_vcpi", 

4301 "port_security": True, 

4302 "port_security_disable_strategy": "allow-address-pairs", 

4303 "floating_ip": "10.1.1.12", 

4304 "ns-vld-id": "mgmtnet", 

4305 "vnf-vld-id": "mgmt_cp_int", 

4306 } 

4307 tasks_by_target_record_id = {} 

4308 net_text = ns_preffix 

4309 net_item = {} 

4310 expected_net_item = { 

4311 "use": "bridge", 

4312 "model": None, 

4313 } 

4314 self.ns._prepare_type_of_interface( 

4315 interface, tasks_by_target_record_id, net_text, net_item 

4316 ) 

4317 self.assertDictEqual(net_item, expected_net_item) 

4318 mock_deep_get.assert_not_called() 

4319 

4320 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4321 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4322 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4323 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4324 def test_prepare_vdu_interfaces( 

4325 self, 

4326 mock_type_of_interface, 

4327 mock_item_of_interface, 

4328 mock_port_security, 

4329 mock_vld_information_of_interface, 

4330 ): 

4331 """Prepare vdu interfaces successfully.""" 

4332 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4333 interface_1 = { 

4334 "name": "vdu-eth1", 

4335 "ns-vld-id": "net1", 

4336 "ip-address": "13.2.12.31", 

4337 "mgmt-interface": True, 

4338 } 

4339 interface_2 = { 

4340 "name": "vdu-eth2", 

4341 "vnf-vld-id": "net2", 

4342 "mac-address": "d0:94:66:ed:fc:e2", 

4343 } 

4344 interface_3 = { 

4345 "name": "vdu-eth3", 

4346 "ns-vld-id": "mgmtnet", 

4347 } 

4348 target_vdu["interfaces"] = [interface_1, interface_2, interface_3] 

4349 extra_dict = { 

4350 "params": "test_params", 

4351 "find_params": "test_find_params", 

4352 "depends_on": [], 

4353 } 

4354 

4355 net_text_1 = f"{ns_preffix}:net1" 

4356 net_text_2 = f"{vnf_preffix}:net2" 

4357 net_text_3 = f"{ns_preffix}:mgmtnet" 

4358 net_item_1 = { 

4359 "name": "vdu-eth1", 

4360 "net_id": f"TASK-{ns_preffix}", 

4361 "type": "virtual", 

4362 } 

4363 net_item_2 = { 

4364 "name": "vdu-eth2", 

4365 "net_id": f"TASK-{ns_preffix}", 

4366 "type": "virtual", 

4367 } 

4368 net_item_3 = { 

4369 "name": "vdu-eth3", 

4370 "net_id": f"TASK-{ns_preffix}", 

4371 "type": "virtual", 

4372 } 

4373 mock_item_of_interface.side_effect = [net_item_1, net_item_2, net_item_3] 

4374 mock_vld_information_of_interface.side_effect = [ 

4375 net_text_1, 

4376 net_text_2, 

4377 net_text_3, 

4378 ] 

4379 net_list = [] 

4380 expected_extra_dict = { 

4381 "params": "test_params", 

4382 "find_params": "test_find_params", 

4383 "depends_on": [net_text_1, net_text_2, net_text_3], 

4384 "mgmt_vdu_interface": 0, 

4385 } 

4386 updated_net_item1 = deepcopy(net_item_1) 

4387 updated_net_item1.update({"ip_address": "13.2.12.31"}) 

4388 updated_net_item2 = deepcopy(net_item_2) 

4389 updated_net_item2.update({"mac_address": "d0:94:66:ed:fc:e2"}) 

4390 expected_net_list = [updated_net_item1, updated_net_item2, net_item_3] 

4391 self.ns._prepare_vdu_interfaces( 

4392 target_vdu, 

4393 extra_dict, 

4394 ns_preffix, 

4395 vnf_preffix, 

4396 self.logger, 

4397 tasks_by_target_record_id, 

4398 net_list, 

4399 ) 

4400 _call_mock_vld_information_of_interface = ( 

4401 mock_vld_information_of_interface.call_args_list 

4402 ) 

4403 self.assertEqual( 

4404 _call_mock_vld_information_of_interface[0][0], 

4405 (interface_1, ns_preffix, vnf_preffix), 

4406 ) 

4407 self.assertEqual( 

4408 _call_mock_vld_information_of_interface[1][0], 

4409 (interface_2, ns_preffix, vnf_preffix), 

4410 ) 

4411 self.assertEqual( 

4412 _call_mock_vld_information_of_interface[2][0], 

4413 (interface_3, ns_preffix, vnf_preffix), 

4414 ) 

4415 

4416 _call_mock_port_security = mock_port_security.call_args_list 

4417 self.assertEqual(_call_mock_port_security[0].args[0], interface_1) 

4418 self.assertEqual(_call_mock_port_security[1].args[0], interface_2) 

4419 self.assertEqual(_call_mock_port_security[2].args[0], interface_3) 

4420 

4421 _call_mock_item_of_interface = mock_item_of_interface.call_args_list 

4422 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1)) 

4423 self.assertEqual(_call_mock_item_of_interface[1][0], (interface_2, net_text_2)) 

4424 self.assertEqual(_call_mock_item_of_interface[2][0], (interface_3, net_text_3)) 

4425 

4426 _call_mock_type_of_interface = mock_type_of_interface.call_args_list 

4427 self.assertEqual( 

4428 _call_mock_type_of_interface[0][0], 

4429 (interface_1, tasks_by_target_record_id, net_text_1, net_item_1), 

4430 ) 

4431 self.assertEqual( 

4432 _call_mock_type_of_interface[1][0], 

4433 (interface_2, tasks_by_target_record_id, net_text_2, net_item_2), 

4434 ) 

4435 self.assertEqual( 

4436 _call_mock_type_of_interface[2][0], 

4437 (interface_3, tasks_by_target_record_id, net_text_3, net_item_3), 

4438 ) 

4439 self.assertEqual(net_list, expected_net_list) 

4440 self.assertEqual(extra_dict, expected_extra_dict) 

4441 self.logger.error.assert_not_called() 

4442 

4443 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4444 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4445 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4446 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4447 def test_prepare_vdu_interfaces_create_net_item_raise_exception( 

4448 self, 

4449 mock_type_of_interface, 

4450 mock_item_of_interface, 

4451 mock_port_security, 

4452 mock_vld_information_of_interface, 

4453 ): 

4454 """Prepare vdu interfaces, create_net_item_of_interface method raise exception.""" 

4455 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4456 interface_1 = { 

4457 "name": "vdu-eth1", 

4458 "ns-vld-id": "net1", 

4459 "ip-address": "13.2.12.31", 

4460 "mgmt-interface": True, 

4461 } 

4462 interface_2 = { 

4463 "name": "vdu-eth2", 

4464 "vnf-vld-id": "net2", 

4465 "mac-address": "d0:94:66:ed:fc:e2", 

4466 } 

4467 interface_3 = { 

4468 "name": "vdu-eth3", 

4469 "ns-vld-id": "mgmtnet", 

4470 } 

4471 target_vdu["interfaces"] = [interface_1, interface_2, interface_3] 

4472 extra_dict = { 

4473 "params": "test_params", 

4474 "find_params": "test_find_params", 

4475 "depends_on": [], 

4476 } 

4477 net_text_1 = f"{ns_preffix}:net1" 

4478 mock_item_of_interface.side_effect = [TypeError, TypeError, TypeError] 

4479 

4480 mock_vld_information_of_interface.side_effect = [net_text_1] 

4481 net_list = [] 

4482 expected_extra_dict = { 

4483 "params": "test_params", 

4484 "find_params": "test_find_params", 

4485 "depends_on": [net_text_1], 

4486 } 

4487 with self.assertRaises(TypeError): 

4488 self.ns._prepare_vdu_interfaces( 

4489 target_vdu, 

4490 extra_dict, 

4491 ns_preffix, 

4492 vnf_preffix, 

4493 self.logger, 

4494 tasks_by_target_record_id, 

4495 net_list, 

4496 ) 

4497 

4498 _call_mock_vld_information_of_interface = ( 

4499 mock_vld_information_of_interface.call_args_list 

4500 ) 

4501 self.assertEqual( 

4502 _call_mock_vld_information_of_interface[0][0], 

4503 (interface_1, ns_preffix, vnf_preffix), 

4504 ) 

4505 

4506 _call_mock_port_security = mock_port_security.call_args_list 

4507 self.assertEqual(_call_mock_port_security[0].args[0], interface_1) 

4508 

4509 _call_mock_item_of_interface = mock_item_of_interface.call_args_list 

4510 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1)) 

4511 

4512 mock_type_of_interface.assert_not_called() 

4513 self.logger.error.assert_not_called() 

4514 self.assertEqual(net_list, []) 

4515 self.assertEqual(extra_dict, expected_extra_dict) 

4516 

4517 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4518 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4519 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4520 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4521 def test_prepare_vdu_interfaces_vld_information_is_empty( 

4522 self, 

4523 mock_type_of_interface, 

4524 mock_item_of_interface, 

4525 mock_port_security, 

4526 mock_vld_information_of_interface, 

4527 ): 

4528 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result.""" 

4529 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4530 interface_1 = { 

4531 "name": "vdu-eth1", 

4532 "ns-vld-id": "net1", 

4533 "ip-address": "13.2.12.31", 

4534 "mgmt-interface": True, 

4535 } 

4536 interface_2 = { 

4537 "name": "vdu-eth2", 

4538 "vnf-vld-id": "net2", 

4539 "mac-address": "d0:94:66:ed:fc:e2", 

4540 } 

4541 interface_3 = { 

4542 "name": "vdu-eth3", 

4543 "ns-vld-id": "mgmtnet", 

4544 } 

4545 target_vdu["interfaces"] = [interface_1, interface_2, interface_3] 

4546 extra_dict = { 

4547 "params": "test_params", 

4548 "find_params": "test_find_params", 

4549 "depends_on": [], 

4550 } 

4551 mock_vld_information_of_interface.side_effect = ["", "", ""] 

4552 net_list = [] 

4553 self.ns._prepare_vdu_interfaces( 

4554 target_vdu, 

4555 extra_dict, 

4556 ns_preffix, 

4557 vnf_preffix, 

4558 self.logger, 

4559 tasks_by_target_record_id, 

4560 net_list, 

4561 ) 

4562 

4563 _call_mock_vld_information_of_interface = ( 

4564 mock_vld_information_of_interface.call_args_list 

4565 ) 

4566 self.assertEqual( 

4567 _call_mock_vld_information_of_interface[0][0], 

4568 (interface_1, ns_preffix, vnf_preffix), 

4569 ) 

4570 self.assertEqual( 

4571 _call_mock_vld_information_of_interface[1][0], 

4572 (interface_2, ns_preffix, vnf_preffix), 

4573 ) 

4574 self.assertEqual( 

4575 _call_mock_vld_information_of_interface[2][0], 

4576 (interface_3, ns_preffix, vnf_preffix), 

4577 ) 

4578 

4579 _call_logger = self.logger.error.call_args_list 

4580 self.assertEqual( 

4581 _call_logger[0][0], 

4582 ("Interface 0 from vdu several_volumes-VM not connected to any vld",), 

4583 ) 

4584 self.assertEqual( 

4585 _call_logger[1][0], 

4586 ("Interface 1 from vdu several_volumes-VM not connected to any vld",), 

4587 ) 

4588 self.assertEqual( 

4589 _call_logger[2][0], 

4590 ("Interface 2 from vdu several_volumes-VM not connected to any vld",), 

4591 ) 

4592 self.assertEqual(net_list, []) 

4593 self.assertEqual( 

4594 extra_dict, 

4595 { 

4596 "params": "test_params", 

4597 "find_params": "test_find_params", 

4598 "depends_on": [], 

4599 }, 

4600 ) 

4601 

4602 mock_item_of_interface.assert_not_called() 

4603 mock_port_security.assert_not_called() 

4604 mock_type_of_interface.assert_not_called() 

4605 

4606 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4607 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4608 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4609 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4610 def test_prepare_vdu_interfaces_empty_interface_list( 

4611 self, 

4612 mock_type_of_interface, 

4613 mock_item_of_interface, 

4614 mock_port_security, 

4615 mock_vld_information_of_interface, 

4616 ): 

4617 """Prepare vdu interfaces, interface list is empty.""" 

4618 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4619 target_vdu["interfaces"] = [] 

4620 extra_dict = {} 

4621 net_list = [] 

4622 self.ns._prepare_vdu_interfaces( 

4623 target_vdu, 

4624 extra_dict, 

4625 ns_preffix, 

4626 vnf_preffix, 

4627 self.logger, 

4628 tasks_by_target_record_id, 

4629 net_list, 

4630 ) 

4631 mock_type_of_interface.assert_not_called() 

4632 mock_vld_information_of_interface.assert_not_called() 

4633 mock_item_of_interface.assert_not_called() 

4634 mock_port_security.assert_not_called() 

4635 

4636 def test_prepare_vdu_ssh_keys(self): 

4637 """Target_vdu has ssh-keys and ro_nsr_public_key exists.""" 

4638 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4639 target_vdu["ssh-keys"] = ["sample-ssh-key"] 

4640 ro_nsr_public_key = {"public_key": "path_of_public_key"} 

4641 target_vdu["ssh-access-required"] = True 

4642 cloud_config = {} 

4643 expected_cloud_config = { 

4644 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}] 

4645 } 

4646 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config) 

4647 self.assertDictEqual(cloud_config, expected_cloud_config) 

4648 

4649 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self): 

4650 """Target_vdu does not have ssh-keys.""" 

4651 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4652 ro_nsr_public_key = {"public_key": "path_of_public_key"} 

4653 target_vdu["ssh-access-required"] = True 

4654 cloud_config = {} 

4655 expected_cloud_config = {"key-pairs": [{"public_key": "path_of_public_key"}]} 

4656 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config) 

4657 self.assertDictEqual(cloud_config, expected_cloud_config) 

4658 

4659 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self): 

4660 """Target_vdu has ssh-keys, ssh-access is not required.""" 

4661 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4662 target_vdu["ssh-keys"] = ["sample-ssh-key"] 

4663 ro_nsr_public_key = {"public_key": "path_of_public_key"} 

4664 target_vdu["ssh-access-required"] = False 

4665 cloud_config = {} 

4666 expected_cloud_config = {"key-pairs": ["sample-ssh-key"]} 

4667 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config) 

4668 self.assertDictEqual(cloud_config, expected_cloud_config) 

4669 

4670 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4671 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4672 def test_add_persistent_root_disk_to_disk_list_keep_false( 

4673 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4674 ): 

4675 """Add persistent root disk to disk_list, keep volume set to False.""" 

4676 root_disk = { 

4677 "id": "persistent-root-volume", 

4678 "type-of-storage": "persistent-storage:persistent-storage", 

4679 "size-of-storage": "10", 

4680 } 

4681 mock_select_persistent_root_disk.return_value = root_disk 

4682 vnfd = deepcopy(vnfd_wth_persistent_storage) 

4683 vnfd["virtual-storage-desc"][1] = root_disk 

4684 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4685 persistent_root_disk = {} 

4686 disk_list = [] 

4687 mock_volume_keeping_required.return_value = False 

4688 expected_disk_list = [ 

4689 { 

4690 "image_id": "ubuntu20.04", 

4691 "size": "10", 

4692 "keep": False, 

4693 } 

4694 ] 

4695 self.ns._add_persistent_root_disk_to_disk_list( 

4696 vnfd, target_vdu, persistent_root_disk, disk_list 

4697 ) 

4698 self.assertEqual(disk_list, expected_disk_list) 

4699 mock_select_persistent_root_disk.assert_called_once() 

4700 mock_volume_keeping_required.assert_called_once() 

4701 

4702 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4703 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4704 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises( 

4705 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4706 ): 

4707 """Add persistent root disk to disk_list""" 

4708 root_disk = { 

4709 "id": "persistent-root-volume", 

4710 "type-of-storage": "persistent-storage:persistent-storage", 

4711 "size-of-storage": "10", 

4712 } 

4713 mock_select_persistent_root_disk.side_effect = AttributeError 

4714 vnfd = deepcopy(vnfd_wth_persistent_storage) 

4715 vnfd["virtual-storage-desc"][1] = root_disk 

4716 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4717 persistent_root_disk = {} 

4718 disk_list = [] 

4719 with self.assertRaises(AttributeError): 

4720 self.ns._add_persistent_root_disk_to_disk_list( 

4721 vnfd, target_vdu, persistent_root_disk, disk_list 

4722 ) 

4723 self.assertEqual(disk_list, []) 

4724 mock_select_persistent_root_disk.assert_called_once() 

4725 mock_volume_keeping_required.assert_not_called() 

4726 

4727 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4728 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4729 def test_add_persistent_root_disk_to_disk_list_keep_true( 

4730 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4731 ): 

4732 """Add persistent root disk, keeo volume set to True.""" 

4733 vnfd = deepcopy(vnfd_wth_persistent_storage) 

4734 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4735 mock_volume_keeping_required.return_value = True 

4736 root_disk = { 

4737 "id": "persistent-root-volume", 

4738 "type-of-storage": "persistent-storage:persistent-storage", 

4739 "size-of-storage": "10", 

4740 "vdu-storage-requirements": [ 

4741 {"key": "keep-volume", "value": "true"}, 

4742 ], 

4743 } 

4744 mock_select_persistent_root_disk.return_value = root_disk 

4745 persistent_root_disk = {} 

4746 disk_list = [] 

4747 expected_disk_list = [ 

4748 { 

4749 "image_id": "ubuntu20.04", 

4750 "size": "10", 

4751 "keep": True, 

4752 } 

4753 ] 

4754 self.ns._add_persistent_root_disk_to_disk_list( 

4755 vnfd, target_vdu, persistent_root_disk, disk_list 

4756 ) 

4757 self.assertEqual(disk_list, expected_disk_list) 

4758 mock_volume_keeping_required.assert_called_once_with(root_disk) 

4759 

4760 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4761 def test_add_persistent_ordinary_disk_to_disk_list( 

4762 self, mock_volume_keeping_required 

4763 ): 

4764 """Add persistent ordinary disk, keeo volume set to True.""" 

4765 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4766 mock_volume_keeping_required.return_value = False 

4767 persistent_root_disk = { 

4768 "persistent-root-volume": { 

4769 "image_id": "ubuntu20.04", 

4770 "size": "10", 

4771 "keep": True, 

4772 } 

4773 } 

4774 ordinary_disk = { 

4775 "id": "persistent-volume2", 

4776 "type-of-storage": "persistent-storage:persistent-storage", 

4777 "size-of-storage": "10", 

4778 } 

4779 persistent_ordinary_disk = {} 

4780 disk_list = [] 

4781 extra_dict = {} 

4782 expected_disk_list = [ 

4783 { 

4784 "size": "10", 

4785 "keep": False, 

4786 "multiattach": False, 

4787 "name": "persistent-volume2", 

4788 } 

4789 ] 

4790 self.ns._add_persistent_ordinary_disks_to_disk_list( 

4791 target_vdu, 

4792 persistent_root_disk, 

4793 persistent_ordinary_disk, 

4794 disk_list, 

4795 extra_dict, 

4796 ) 

4797 self.assertEqual(disk_list, expected_disk_list) 

4798 mock_volume_keeping_required.assert_called_once_with(ordinary_disk) 

4799 

4800 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4801 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict( 

4802 self, mock_volume_keeping_required 

4803 ): 

4804 """Add persistent ordinary disk, vsd id is in root_disk dict.""" 

4805 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4806 mock_volume_keeping_required.return_value = False 

4807 persistent_root_disk = { 

4808 "persistent-root-volume": { 

4809 "image_id": "ubuntu20.04", 

4810 "size": "10", 

4811 "keep": True, 

4812 }, 

4813 "persistent-volume2": { 

4814 "size": "10", 

4815 }, 

4816 } 

4817 persistent_ordinary_disk = {} 

4818 disk_list = [] 

4819 extra_dict = {} 

4820 

4821 self.ns._add_persistent_ordinary_disks_to_disk_list( 

4822 target_vdu, 

4823 persistent_root_disk, 

4824 persistent_ordinary_disk, 

4825 disk_list, 

4826 extra_dict, 

4827 ) 

4828 self.assertEqual(disk_list, []) 

4829 mock_volume_keeping_required.assert_not_called() 

4830 

4831 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4832 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4833 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage( 

4834 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4835 ): 

4836 """VNFD does not have persistent storage.""" 

4837 vnfd = deepcopy(vnfd_wthout_persistent_storage) 

4838 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

4839 mock_select_persistent_root_disk.return_value = None 

4840 persistent_root_disk = {} 

4841 disk_list = [] 

4842 self.ns._add_persistent_root_disk_to_disk_list( 

4843 vnfd, target_vdu, persistent_root_disk, disk_list 

4844 ) 

4845 self.assertEqual(disk_list, []) 

4846 self.assertEqual(mock_select_persistent_root_disk.call_count, 2) 

4847 mock_volume_keeping_required.assert_not_called() 

4848 

4849 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4850 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4851 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk( 

4852 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4853 ): 

4854 """Persistent_root_disk dict is empty.""" 

4855 vnfd = deepcopy(vnfd_wthout_persistent_storage) 

4856 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

4857 mock_select_persistent_root_disk.return_value = None 

4858 persistent_root_disk = {} 

4859 disk_list = [] 

4860 self.ns._add_persistent_root_disk_to_disk_list( 

4861 vnfd, target_vdu, persistent_root_disk, disk_list 

4862 ) 

4863 self.assertEqual(disk_list, []) 

4864 self.assertEqual(mock_select_persistent_root_disk.call_count, 2) 

4865 mock_volume_keeping_required.assert_not_called() 

4866 

4867 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self): 

4868 """Invalid extra dict.""" 

4869 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4870 target_vdu["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id" 

4871 extra_dict = {} 

4872 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4873 with self.assertRaises(NsException) as err: 

4874 self.ns._prepare_vdu_affinity_group_list(target_vdu, extra_dict, ns_preffix) 

4875 self.assertEqual(str(err.exception), "Invalid extra_dict format.") 

4876 

4877 def test_prepare_vdu_affinity_group_list_one_affinity_group(self): 

4878 """There is one affinity-group.""" 

4879 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4880 target_vdu["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"] 

4881 extra_dict = {"depends_on": []} 

4882 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4883 affinity_group_txt = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id" 

4884 expected_result = [{"affinity_group_id": "TASK-" + affinity_group_txt}] 

4885 expected_extra_dict = {"depends_on": [affinity_group_txt]} 

4886 result = self.ns._prepare_vdu_affinity_group_list( 

4887 target_vdu, extra_dict, ns_preffix 

4888 ) 

4889 self.assertDictEqual(extra_dict, expected_extra_dict) 

4890 self.assertEqual(result, expected_result) 

4891 

4892 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self): 

4893 """There are two affinity-groups.""" 

4894 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4895 target_vdu["affinity-or-anti-affinity-group-id"] = [ 

4896 "affinity-group-id1", 

4897 "affinity-group-id2", 

4898 ] 

4899 extra_dict = {"depends_on": []} 

4900 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4901 affinity_group_txt1 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1" 

4902 affinity_group_txt2 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2" 

4903 expected_result = [ 

4904 {"affinity_group_id": "TASK-" + affinity_group_txt1}, 

4905 {"affinity_group_id": "TASK-" + affinity_group_txt2}, 

4906 ] 

4907 expected_extra_dict = {"depends_on": [affinity_group_txt1, affinity_group_txt2]} 

4908 result = self.ns._prepare_vdu_affinity_group_list( 

4909 target_vdu, extra_dict, ns_preffix 

4910 ) 

4911 self.assertDictEqual(extra_dict, expected_extra_dict) 

4912 self.assertEqual(result, expected_result) 

4913 

4914 def test_prepare_vdu_affinity_group_list_no_affinity_group(self): 

4915 """There is not any affinity-group.""" 

4916 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4917 extra_dict = {"depends_on": []} 

4918 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4919 result = self.ns._prepare_vdu_affinity_group_list( 

4920 target_vdu, extra_dict, ns_preffix 

4921 ) 

4922 self.assertDictEqual(extra_dict, {"depends_on": []}) 

4923 self.assertEqual(result, []) 

4924 

4925 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

4926 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

4927 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

4928 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

4929 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

4930 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

4931 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

4932 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

4933 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

4934 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

4935 def test_process_vdu_params_with_inst_vol_list( 

4936 self, 

4937 mock_prepare_vdu_affinity_group_list, 

4938 mock_add_persistent_ordinary_disks_to_disk_list, 

4939 mock_add_persistent_root_disk_to_disk_list, 

4940 mock_find_persistent_volumes, 

4941 mock_find_persistent_root_volumes, 

4942 mock_prepare_vdu_ssh_keys, 

4943 mock_prepare_vdu_cloud_init, 

4944 mock_prepare_vdu_interfaces, 

4945 mock_locate_vdu_interfaces, 

4946 mock_sort_vdu_interfaces, 

4947 ): 

4948 """Instantiation volume list is empty.""" 

4949 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4950 

4951 target_vdu["interfaces"] = interfaces_wth_all_positions 

4952 

4953 vdu_instantiation_vol_list = [ 

4954 { 

4955 "vim-volume-id": vim_volume_id, 

4956 "name": "persistent-volume2", 

4957 } 

4958 ] 

4959 target_vdu["additionalParams"] = { 

4960 "OSM": {"vdu_volumes": vdu_instantiation_vol_list} 

4961 } 

4962 mock_prepare_vdu_cloud_init.return_value = {} 

4963 mock_prepare_vdu_affinity_group_list.return_value = [] 

4964 persistent_root_disk = { 

4965 "persistent-root-volume": { 

4966 "image_id": "ubuntu20.04", 

4967 "size": "10", 

4968 } 

4969 } 

4970 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

4971 

4972 new_kwargs = deepcopy(kwargs) 

4973 new_kwargs.update( 

4974 { 

4975 "vnfr_id": vnfr_id, 

4976 "nsr_id": nsr_id, 

4977 "tasks_by_target_record_id": {}, 

4978 "logger": "logger", 

4979 } 

4980 ) 

4981 expected_extra_dict_copy = deepcopy(expected_extra_dict) 

4982 vnfd = deepcopy(vnfd_wth_persistent_storage) 

4983 db.get_one.return_value = vnfd 

4984 result = Ns._process_vdu_params( 

4985 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

4986 ) 

4987 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

4988 mock_locate_vdu_interfaces.assert_not_called() 

4989 mock_prepare_vdu_cloud_init.assert_called_once() 

4990 mock_add_persistent_root_disk_to_disk_list.assert_not_called() 

4991 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called() 

4992 mock_prepare_vdu_interfaces.assert_called_once_with( 

4993 target_vdu, 

4994 expected_extra_dict_copy, 

4995 ns_preffix, 

4996 vnf_preffix, 

4997 "logger", 

4998 {}, 

4999 [], 

5000 ) 

5001 self.assertDictEqual(result, expected_extra_dict_copy) 

5002 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5003 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5004 mock_find_persistent_volumes.assert_called_once_with( 

5005 persistent_root_disk, target_vdu, vdu_instantiation_vol_list, [] 

5006 ) 

5007 

5008 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5009 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5010 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5011 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5012 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5013 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5014 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5015 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5016 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5017 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5018 def test_process_vdu_params_wth_affinity_groups( 

5019 self, 

5020 mock_prepare_vdu_affinity_group_list, 

5021 mock_add_persistent_ordinary_disks_to_disk_list, 

5022 mock_add_persistent_root_disk_to_disk_list, 

5023 mock_find_persistent_volumes, 

5024 mock_find_persistent_root_volumes, 

5025 mock_prepare_vdu_ssh_keys, 

5026 mock_prepare_vdu_cloud_init, 

5027 mock_prepare_vdu_interfaces, 

5028 mock_locate_vdu_interfaces, 

5029 mock_sort_vdu_interfaces, 

5030 ): 

5031 """There is cloud-config.""" 

5032 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

5033 

5034 self.maxDiff = None 

5035 target_vdu["interfaces"] = interfaces_wth_all_positions 

5036 mock_prepare_vdu_cloud_init.return_value = {} 

5037 mock_prepare_vdu_affinity_group_list.return_value = [ 

5038 "affinity_group_1", 

5039 "affinity_group_2", 

5040 ] 

5041 

5042 new_kwargs = deepcopy(kwargs) 

5043 new_kwargs.update( 

5044 { 

5045 "vnfr_id": vnfr_id, 

5046 "nsr_id": nsr_id, 

5047 "tasks_by_target_record_id": {}, 

5048 "logger": "logger", 

5049 } 

5050 ) 

5051 expected_extra_dict3 = deepcopy(expected_extra_dict2) 

5052 expected_extra_dict3["params"]["affinity_group_list"] = [ 

5053 "affinity_group_1", 

5054 "affinity_group_2", 

5055 ] 

5056 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5057 db.get_one.return_value = vnfd 

5058 result = Ns._process_vdu_params( 

5059 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5060 ) 

5061 self.assertDictEqual(result, expected_extra_dict3) 

5062 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5063 mock_locate_vdu_interfaces.assert_not_called() 

5064 mock_prepare_vdu_cloud_init.assert_called_once() 

5065 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5066 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5067 mock_prepare_vdu_interfaces.assert_called_once_with( 

5068 target_vdu, 

5069 expected_extra_dict3, 

5070 ns_preffix, 

5071 vnf_preffix, 

5072 "logger", 

5073 {}, 

5074 [], 

5075 ) 

5076 

5077 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5078 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5079 mock_find_persistent_volumes.assert_not_called() 

5080 

5081 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5082 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5083 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5084 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5085 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5086 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5087 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5088 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5089 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5090 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5091 def test_process_vdu_params_wth_cloud_config( 

5092 self, 

5093 mock_prepare_vdu_affinity_group_list, 

5094 mock_add_persistent_ordinary_disks_to_disk_list, 

5095 mock_add_persistent_root_disk_to_disk_list, 

5096 mock_find_persistent_volumes, 

5097 mock_find_persistent_root_volumes, 

5098 mock_prepare_vdu_ssh_keys, 

5099 mock_prepare_vdu_cloud_init, 

5100 mock_prepare_vdu_interfaces, 

5101 mock_locate_vdu_interfaces, 

5102 mock_sort_vdu_interfaces, 

5103 ): 

5104 """There is cloud-config.""" 

5105 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

5106 

5107 self.maxDiff = None 

5108 target_vdu["interfaces"] = interfaces_wth_all_positions 

5109 mock_prepare_vdu_cloud_init.return_value = { 

5110 "user-data": user_data, 

5111 "boot-data-drive": "vda", 

5112 } 

5113 mock_prepare_vdu_affinity_group_list.return_value = [] 

5114 

5115 new_kwargs = deepcopy(kwargs) 

5116 new_kwargs.update( 

5117 { 

5118 "vnfr_id": vnfr_id, 

5119 "nsr_id": nsr_id, 

5120 "tasks_by_target_record_id": {}, 

5121 "logger": "logger", 

5122 } 

5123 ) 

5124 expected_extra_dict3 = deepcopy(expected_extra_dict2) 

5125 expected_extra_dict3["params"]["cloud_config"] = { 

5126 "user-data": user_data, 

5127 "boot-data-drive": "vda", 

5128 } 

5129 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5130 db.get_one.return_value = vnfd 

5131 result = Ns._process_vdu_params( 

5132 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5133 ) 

5134 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5135 mock_locate_vdu_interfaces.assert_not_called() 

5136 mock_prepare_vdu_cloud_init.assert_called_once() 

5137 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5138 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5139 mock_prepare_vdu_interfaces.assert_called_once_with( 

5140 target_vdu, 

5141 expected_extra_dict3, 

5142 ns_preffix, 

5143 vnf_preffix, 

5144 "logger", 

5145 {}, 

5146 [], 

5147 ) 

5148 self.assertDictEqual(result, expected_extra_dict3) 

5149 mock_prepare_vdu_ssh_keys.assert_called_once_with( 

5150 target_vdu, None, {"user-data": user_data, "boot-data-drive": "vda"} 

5151 ) 

5152 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5153 mock_find_persistent_volumes.assert_not_called() 

5154 

5155 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5156 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5157 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5158 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5159 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5160 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5161 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5162 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5163 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5164 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5165 def test_process_vdu_params_wthout_persistent_storage( 

5166 self, 

5167 mock_prepare_vdu_affinity_group_list, 

5168 mock_add_persistent_ordinary_disks_to_disk_list, 

5169 mock_add_persistent_root_disk_to_disk_list, 

5170 mock_find_persistent_volumes, 

5171 mock_find_persistent_root_volumes, 

5172 mock_prepare_vdu_ssh_keys, 

5173 mock_prepare_vdu_cloud_init, 

5174 mock_prepare_vdu_interfaces, 

5175 mock_locate_vdu_interfaces, 

5176 mock_sort_vdu_interfaces, 

5177 ): 

5178 """There is not any persistent storage.""" 

5179 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

5180 

5181 self.maxDiff = None 

5182 target_vdu["interfaces"] = interfaces_wth_all_positions 

5183 mock_prepare_vdu_cloud_init.return_value = {} 

5184 mock_prepare_vdu_affinity_group_list.return_value = [] 

5185 

5186 new_kwargs = deepcopy(kwargs) 

5187 new_kwargs.update( 

5188 { 

5189 "vnfr_id": vnfr_id, 

5190 "nsr_id": nsr_id, 

5191 "tasks_by_target_record_id": {}, 

5192 "logger": "logger", 

5193 } 

5194 ) 

5195 expected_extra_dict_copy = deepcopy(expected_extra_dict2) 

5196 vnfd = deepcopy(vnfd_wthout_persistent_storage) 

5197 db.get_one.return_value = vnfd 

5198 result = Ns._process_vdu_params( 

5199 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5200 ) 

5201 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5202 mock_locate_vdu_interfaces.assert_not_called() 

5203 mock_prepare_vdu_cloud_init.assert_called_once() 

5204 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5205 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5206 mock_prepare_vdu_interfaces.assert_called_once_with( 

5207 target_vdu, 

5208 expected_extra_dict_copy, 

5209 ns_preffix, 

5210 vnf_preffix, 

5211 "logger", 

5212 {}, 

5213 [], 

5214 ) 

5215 self.assertDictEqual(result, expected_extra_dict_copy) 

5216 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5217 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5218 mock_find_persistent_volumes.assert_not_called() 

5219 

5220 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5221 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5222 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5223 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5224 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5225 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5226 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5227 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5228 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5229 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5230 def test_process_vdu_params_interfaces_partially_located( 

5231 self, 

5232 mock_prepare_vdu_affinity_group_list, 

5233 mock_add_persistent_ordinary_disks_to_disk_list, 

5234 mock_add_persistent_root_disk_to_disk_list, 

5235 mock_find_persistent_volumes, 

5236 mock_find_persistent_root_volumes, 

5237 mock_prepare_vdu_ssh_keys, 

5238 mock_prepare_vdu_cloud_init, 

5239 mock_prepare_vdu_interfaces, 

5240 mock_locate_vdu_interfaces, 

5241 mock_sort_vdu_interfaces, 

5242 ): 

5243 """Some interfaces have position.""" 

5244 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5245 

5246 self.maxDiff = None 

5247 target_vdu["interfaces"] = [ 

5248 { 

5249 "name": "vdu-eth1", 

5250 "ns-vld-id": "net1", 

5251 }, 

5252 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2}, 

5253 { 

5254 "name": "vdu-eth3", 

5255 "ns-vld-id": "mgmtnet", 

5256 }, 

5257 ] 

5258 mock_prepare_vdu_cloud_init.return_value = {} 

5259 mock_prepare_vdu_affinity_group_list.return_value = [] 

5260 persistent_root_disk = { 

5261 "persistent-root-volume": { 

5262 "image_id": "ubuntu20.04", 

5263 "size": "10", 

5264 "keep": True, 

5265 } 

5266 } 

5267 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

5268 

5269 new_kwargs = deepcopy(kwargs) 

5270 new_kwargs.update( 

5271 { 

5272 "vnfr_id": vnfr_id, 

5273 "nsr_id": nsr_id, 

5274 "tasks_by_target_record_id": {}, 

5275 "logger": "logger", 

5276 } 

5277 ) 

5278 

5279 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5280 db.get_one.return_value = vnfd 

5281 result = Ns._process_vdu_params( 

5282 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5283 ) 

5284 expected_extra_dict_copy = deepcopy(expected_extra_dict) 

5285 mock_sort_vdu_interfaces.assert_not_called() 

5286 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5287 mock_prepare_vdu_cloud_init.assert_called_once() 

5288 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5289 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5290 mock_prepare_vdu_interfaces.assert_called_once_with( 

5291 target_vdu, 

5292 expected_extra_dict_copy, 

5293 ns_preffix, 

5294 vnf_preffix, 

5295 "logger", 

5296 {}, 

5297 [], 

5298 ) 

5299 self.assertDictEqual(result, expected_extra_dict_copy) 

5300 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5301 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5302 mock_find_persistent_volumes.assert_not_called() 

5303 mock_find_persistent_root_volumes.assert_not_called() 

5304 

5305 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5306 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5307 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5308 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5309 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5310 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5311 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5312 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5313 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5314 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5315 def test_process_vdu_params_no_interface_position( 

5316 self, 

5317 mock_prepare_vdu_affinity_group_list, 

5318 mock_add_persistent_ordinary_disks_to_disk_list, 

5319 mock_add_persistent_root_disk_to_disk_list, 

5320 mock_find_persistent_volumes, 

5321 mock_find_persistent_root_volumes, 

5322 mock_prepare_vdu_ssh_keys, 

5323 mock_prepare_vdu_cloud_init, 

5324 mock_prepare_vdu_interfaces, 

5325 mock_locate_vdu_interfaces, 

5326 mock_sort_vdu_interfaces, 

5327 ): 

5328 """Interfaces do not have position.""" 

5329 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5330 

5331 self.maxDiff = None 

5332 target_vdu["interfaces"] = interfaces_wthout_positions 

5333 mock_prepare_vdu_cloud_init.return_value = {} 

5334 mock_prepare_vdu_affinity_group_list.return_value = [] 

5335 persistent_root_disk = { 

5336 "persistent-root-volume": { 

5337 "image_id": "ubuntu20.04", 

5338 "size": "10", 

5339 "keep": True, 

5340 } 

5341 } 

5342 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

5343 new_kwargs = deepcopy(kwargs) 

5344 new_kwargs.update( 

5345 { 

5346 "vnfr_id": vnfr_id, 

5347 "nsr_id": nsr_id, 

5348 "tasks_by_target_record_id": {}, 

5349 "logger": "logger", 

5350 } 

5351 ) 

5352 

5353 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5354 db.get_one.return_value = vnfd 

5355 result = Ns._process_vdu_params( 

5356 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5357 ) 

5358 expected_extra_dict_copy = deepcopy(expected_extra_dict) 

5359 mock_sort_vdu_interfaces.assert_not_called() 

5360 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5361 mock_prepare_vdu_cloud_init.assert_called_once() 

5362 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5363 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5364 mock_prepare_vdu_interfaces.assert_called_once_with( 

5365 target_vdu, 

5366 expected_extra_dict_copy, 

5367 ns_preffix, 

5368 vnf_preffix, 

5369 "logger", 

5370 {}, 

5371 [], 

5372 ) 

5373 self.assertDictEqual(result, expected_extra_dict_copy) 

5374 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5375 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5376 mock_find_persistent_volumes.assert_not_called() 

5377 mock_find_persistent_root_volumes.assert_not_called() 

5378 

5379 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5380 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5381 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5382 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5383 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5384 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5385 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5386 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5387 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5388 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5389 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception( 

5390 self, 

5391 mock_prepare_vdu_affinity_group_list, 

5392 mock_add_persistent_ordinary_disks_to_disk_list, 

5393 mock_add_persistent_root_disk_to_disk_list, 

5394 mock_find_persistent_volumes, 

5395 mock_find_persistent_root_volumes, 

5396 mock_prepare_vdu_ssh_keys, 

5397 mock_prepare_vdu_cloud_init, 

5398 mock_prepare_vdu_interfaces, 

5399 mock_locate_vdu_interfaces, 

5400 mock_sort_vdu_interfaces, 

5401 ): 

5402 """Prepare vdu interfaces method raises exception.""" 

5403 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5404 

5405 self.maxDiff = None 

5406 target_vdu["interfaces"] = interfaces_wthout_positions 

5407 mock_prepare_vdu_cloud_init.return_value = {} 

5408 mock_prepare_vdu_affinity_group_list.return_value = [] 

5409 persistent_root_disk = { 

5410 "persistent-root-volume": { 

5411 "image_id": "ubuntu20.04", 

5412 "size": "10", 

5413 "keep": True, 

5414 } 

5415 } 

5416 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

5417 new_kwargs = deepcopy(kwargs) 

5418 new_kwargs.update( 

5419 { 

5420 "vnfr_id": vnfr_id, 

5421 "nsr_id": nsr_id, 

5422 "tasks_by_target_record_id": {}, 

5423 "logger": "logger", 

5424 } 

5425 ) 

5426 mock_prepare_vdu_interfaces.side_effect = TypeError 

5427 

5428 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5429 db.get_one.return_value = vnfd 

5430 with self.assertRaises(Exception) as err: 

5431 Ns._process_vdu_params( 

5432 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5433 ) 

5434 self.assertEqual(type(err), TypeError) 

5435 mock_sort_vdu_interfaces.assert_not_called() 

5436 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5437 mock_prepare_vdu_cloud_init.assert_not_called() 

5438 mock_add_persistent_root_disk_to_disk_list.assert_not_called() 

5439 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called() 

5440 mock_prepare_vdu_interfaces.assert_called_once() 

5441 mock_prepare_vdu_ssh_keys.assert_not_called() 

5442 mock_prepare_vdu_affinity_group_list.assert_not_called() 

5443 mock_find_persistent_volumes.assert_not_called() 

5444 mock_find_persistent_root_volumes.assert_not_called() 

5445 

5446 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5447 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5448 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5449 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5450 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5451 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5452 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5453 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5454 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5455 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5456 def test_process_vdu_params_add_persistent_root_disk_raises_exception( 

5457 self, 

5458 mock_prepare_vdu_affinity_group_list, 

5459 mock_add_persistent_ordinary_disks_to_disk_list, 

5460 mock_add_persistent_root_disk_to_disk_list, 

5461 mock_find_persistent_volumes, 

5462 mock_find_persistent_root_volumes, 

5463 mock_prepare_vdu_ssh_keys, 

5464 mock_prepare_vdu_cloud_init, 

5465 mock_prepare_vdu_interfaces, 

5466 mock_locate_vdu_interfaces, 

5467 mock_sort_vdu_interfaces, 

5468 ): 

5469 """Add persistent root disk method raises exception.""" 

5470 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5471 

5472 self.maxDiff = None 

5473 target_vdu["interfaces"] = interfaces_wthout_positions 

5474 mock_prepare_vdu_cloud_init.return_value = {} 

5475 mock_prepare_vdu_affinity_group_list.return_value = [] 

5476 mock_add_persistent_root_disk_to_disk_list.side_effect = KeyError 

5477 new_kwargs = deepcopy(kwargs) 

5478 new_kwargs.update( 

5479 { 

5480 "vnfr_id": vnfr_id, 

5481 "nsr_id": nsr_id, 

5482 "tasks_by_target_record_id": {}, 

5483 "logger": "logger", 

5484 } 

5485 ) 

5486 

5487 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5488 db.get_one.return_value = vnfd 

5489 with self.assertRaises(Exception) as err: 

5490 Ns._process_vdu_params( 

5491 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5492 ) 

5493 self.assertEqual(type(err), KeyError) 

5494 mock_sort_vdu_interfaces.assert_not_called() 

5495 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5496 mock_prepare_vdu_cloud_init.assert_called_once() 

5497 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5498 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called() 

5499 mock_prepare_vdu_interfaces.assert_called_once_with( 

5500 target_vdu, 

5501 { 

5502 "depends_on": [ 

5503 f"{ns_preffix}:image.0", 

5504 f"{ns_preffix}:flavor.0", 

5505 ] 

5506 }, 

5507 ns_preffix, 

5508 vnf_preffix, 

5509 "logger", 

5510 {}, 

5511 [], 

5512 ) 

5513 

5514 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5515 mock_prepare_vdu_affinity_group_list.assert_not_called() 

5516 mock_find_persistent_volumes.assert_not_called() 

5517 mock_find_persistent_root_volumes.assert_not_called() 

5518 

5519 def test_select_persistent_root_disk(self): 

5520 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5521 vdu["virtual-storage-desc"] = [ 

5522 "persistent-root-volume", 

5523 "persistent-volume2", 

5524 "ephemeral-volume", 

5525 ] 

5526 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5527 expected_result = vsd 

5528 result = Ns._select_persistent_root_disk(vsd, vdu) 

5529 self.assertEqual(result, expected_result) 

5530 

5531 def test_select_persistent_root_disk_first_vsd_is_different(self): 

5532 """VDU first virtual-storage-desc is different than vsd id.""" 

5533 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5534 vdu["virtual-storage-desc"] = [ 

5535 "persistent-volume2", 

5536 "persistent-root-volume", 

5537 "ephemeral-volume", 

5538 ] 

5539 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5540 expected_result = None 

5541 result = Ns._select_persistent_root_disk(vsd, vdu) 

5542 self.assertEqual(result, expected_result) 

5543 

5544 def test_select_persistent_root_disk_vsd_is_not_persistent(self): 

5545 """vsd type is not persistent.""" 

5546 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5547 vdu["virtual-storage-desc"] = [ 

5548 "persistent-volume2", 

5549 "persistent-root-volume", 

5550 "ephemeral-volume", 

5551 ] 

5552 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5553 vsd["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage" 

5554 expected_result = None 

5555 result = Ns._select_persistent_root_disk(vsd, vdu) 

5556 self.assertEqual(result, expected_result) 

5557 

5558 def test_select_persistent_root_disk_vsd_does_not_have_size(self): 

5559 """vsd size is None.""" 

5560 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5561 vdu["virtual-storage-desc"] = [ 

5562 "persistent-volume2", 

5563 "persistent-root-volume", 

5564 "ephemeral-volume", 

5565 ] 

5566 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5567 vsd["size-of-storage"] = None 

5568 expected_result = None 

5569 result = Ns._select_persistent_root_disk(vsd, vdu) 

5570 self.assertEqual(result, expected_result) 

5571 

5572 def test_select_persistent_root_disk_vdu_wthout_vsd(self): 

5573 """VDU does not have virtual-storage-desc.""" 

5574 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5575 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5576 expected_result = None 

5577 result = Ns._select_persistent_root_disk(vsd, vdu) 

5578 self.assertEqual(result, expected_result) 

5579 

5580 def test_select_persistent_root_disk_invalid_vsd_type(self): 

5581 """vsd is list, expected to be a dict.""" 

5582 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5583 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"] 

5584 with self.assertRaises(AttributeError): 

5585 Ns._select_persistent_root_disk(vsd, vdu)