Coverage for NG-RO/osm_ng_ro/tests/test_ns.py: 99%

1909 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-14 12:04 +0000

1####################################################################################### 

2# Copyright ETSI Contributors and Others. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16####################################################################################### 

17from copy import deepcopy 

18import unittest 

19from unittest.mock import MagicMock, Mock, patch 

20 

21from jinja2 import ( 

22 Environment, 

23 select_autoescape, 

24 StrictUndefined, 

25 TemplateError, 

26 TemplateNotFound, 

27 UndefinedError, 

28) 

29from osm_ng_ro.ns import Ns, NsException 

30 

31 

32__author__ = "Eduardo Sousa" 

33__date__ = "$19-NOV-2021 00:00:00$" 

34 

35 

36# Variables used in Tests 

37vnfd_wth_persistent_storage = { 

38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

39 "df": [ 

40 { 

41 "id": "default-df", 

42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}], 

43 } 

44 ], 

45 "id": "several_volumes-vnf", 

46 "product-name": "several_volumes-vnf", 

47 "vdu": [ 

48 { 

49 "id": "several_volumes-VM", 

50 "name": "several_volumes-VM", 

51 "sw-image-desc": "ubuntu20.04", 

52 "alternative-sw-image-desc": [ 

53 "ubuntu20.04-aws", 

54 "ubuntu20.04-azure", 

55 ], 

56 "virtual-storage-desc": [ 

57 "persistent-root-volume", 

58 "persistent-volume2", 

59 "ephemeral-volume", 

60 ], 

61 } 

62 ], 

63 "version": "1.0", 

64 "virtual-storage-desc": [ 

65 { 

66 "id": "persistent-volume2", 

67 "type-of-storage": "persistent-storage:persistent-storage", 

68 "size-of-storage": "10", 

69 }, 

70 { 

71 "id": "persistent-root-volume", 

72 "type-of-storage": "persistent-storage:persistent-storage", 

73 "size-of-storage": "10", 

74 "vdu-storage-requirements": [ 

75 {"key": "keep-volume", "value": "true"}, 

76 ], 

77 }, 

78 { 

79 "id": "ephemeral-volume", 

80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

81 "size-of-storage": "1", 

82 }, 

83 ], 

84 "_admin": { 

85 "storage": { 

86 "fs": "mongo", 

87 "path": "/app/storage/", 

88 }, 

89 "type": "vnfd", 

90 }, 

91} 

92vim_volume_id = "ru937f49-3870-4169-b758-9732e1ff40f3" 

93task_by_target_record_id = { 

94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": { 

95 "extra_dict": {"params": {"net_type": "SR-IOV"}} 

96 } 

97} 

98interfaces_wthout_positions = [ 

99 { 

100 "name": "vdu-eth1", 

101 "ns-vld-id": "net1", 

102 }, 

103 { 

104 "name": "vdu-eth2", 

105 "ns-vld-id": "net2", 

106 }, 

107 { 

108 "name": "vdu-eth3", 

109 "ns-vld-id": "mgmtnet", 

110 }, 

111] 

112interfaces_wth_all_positions = [ 

113 { 

114 "name": "vdu-eth1", 

115 "ns-vld-id": "net1", 

116 "position": 2, 

117 }, 

118 { 

119 "name": "vdu-eth2", 

120 "ns-vld-id": "net2", 

121 "position": 0, 

122 }, 

123 { 

124 "name": "vdu-eth3", 

125 "ns-vld-id": "mgmtnet", 

126 "position": 1, 

127 }, 

128] 

129target_vdu_wth_persistent_storage = { 

130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960", 

131 "ns-flavor-id": "0", 

132 "ns-image-id": "0", 

133 "vdu-name": "several_volumes-VM", 

134 "interfaces": [ 

135 { 

136 "name": "vdu-eth0", 

137 "ns-vld-id": "mgmtnet", 

138 } 

139 ], 

140 "virtual-storages": [ 

141 { 

142 "id": "persistent-volume2", 

143 "size-of-storage": "10", 

144 "type-of-storage": "persistent-storage:persistent-storage", 

145 }, 

146 { 

147 "id": "persistent-root-volume", 

148 "size-of-storage": "10", 

149 "type-of-storage": "persistent-storage:persistent-storage", 

150 "vdu-storage-requirements": [ 

151 {"key": "keep-volume", "value": "true"}, 

152 ], 

153 }, 

154 { 

155 "id": "ephemeral-volume", 

156 "size-of-storage": "1", 

157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

158 }, 

159 ], 

160} 

161db = MagicMock(name="database mock") 

162fs = MagicMock(name="database mock") 

163ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

164vnf_preffix = "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5" 

165vnfr_id = "wh47f48-y870-4169-b758-5732e1ff40f5" 

166nsr_id = "th47f48-9870-4169-b758-9732e1ff40f3" 

167indata = { 

168 "name": "sample_name", 

169} 

170expected_extra_dict = { 

171 "depends_on": [ 

172 f"{ns_preffix}:image.0", 

173 f"{ns_preffix}:flavor.0", 

174 ], 

175 "params": { 

176 "affinity_group_list": [], 

177 "availability_zone_index": None, 

178 "availability_zone_list": None, 

179 "cloud_config": None, 

180 "security_group_name": None, 

181 "description": "several_volumes-VM", 

182 "disk_list": [], 

183 "flavor_id": f"TASK-{ns_preffix}:flavor.0", 

184 "image_id": f"TASK-{ns_preffix}:image.0", 

185 "name": "sample_name-vnf-several-volumes-several_volumes-VM-0", 

186 "net_list": [], 

187 "start": True, 

188 }, 

189} 

190 

191expected_extra_dict2 = { 

192 "depends_on": [ 

193 f"{ns_preffix}:image.0", 

194 f"{ns_preffix}:flavor.0", 

195 ], 

196 "params": { 

197 "affinity_group_list": [], 

198 "availability_zone_index": None, 

199 "availability_zone_list": None, 

200 "cloud_config": None, 

201 "security_group_name": None, 

202 "description": "without_volumes-VM", 

203 "disk_list": [], 

204 "flavor_id": f"TASK-{ns_preffix}:flavor.0", 

205 "image_id": f"TASK-{ns_preffix}:image.0", 

206 "name": "sample_name-vnf-several-volumes-without_volumes-VM-0", 

207 "net_list": [], 

208 "start": True, 

209 }, 

210} 

211 

212tasks_by_target_record_id = { 

213 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": { 

214 "extra_dict": { 

215 "params": { 

216 "net_type": "SR-IOV", 

217 } 

218 } 

219 } 

220} 

221kwargs = { 

222 "db": MagicMock(), 

223 "vdu2cloud_init": {}, 

224 "vnfr": { 

225 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

226 "member-vnf-index-ref": "vnf-several-volumes", 

227 }, 

228} 

229vnfd_wthout_persistent_storage = { 

230 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

231 "df": [ 

232 { 

233 "id": "default-df", 

234 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}], 

235 } 

236 ], 

237 "id": "without_volumes-vnf", 

238 "product-name": "without_volumes-vnf", 

239 "vdu": [ 

240 { 

241 "id": "without_volumes-VM", 

242 "name": "without_volumes-VM", 

243 "sw-image-desc": "ubuntu20.04", 

244 "alternative-sw-image-desc": [ 

245 "ubuntu20.04-aws", 

246 "ubuntu20.04-azure", 

247 ], 

248 "virtual-storage-desc": ["root-volume", "ephemeral-volume"], 

249 } 

250 ], 

251 "version": "1.0", 

252 "virtual-storage-desc": [ 

253 {"id": "root-volume", "size-of-storage": "10"}, 

254 { 

255 "id": "ephemeral-volume", 

256 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

257 "size-of-storage": "1", 

258 }, 

259 ], 

260 "_admin": { 

261 "storage": { 

262 "fs": "mongo", 

263 "path": "/app/storage/", 

264 }, 

265 "type": "vnfd", 

266 }, 

267} 

268 

269target_vdu_wthout_persistent_storage = { 

270 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960", 

271 "ns-flavor-id": "0", 

272 "ns-image-id": "0", 

273 "vdu-name": "without_volumes-VM", 

274 "interfaces": [ 

275 { 

276 "name": "vdu-eth0", 

277 "ns-vld-id": "mgmtnet", 

278 } 

279 ], 

280 "virtual-storages": [ 

281 { 

282 "id": "root-volume", 

283 "size-of-storage": "10", 

284 }, 

285 { 

286 "id": "ephemeral-volume", 

287 "size-of-storage": "1", 

288 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

289 }, 

290 ], 

291} 

292cloud_init_content = """ 

293disk_setup: 

294 ephemeral0: 

295 table_type: {{type}} 

296 layout: True 

297 overwrite: {{is_override}} 

298runcmd: 

299 - [ ls, -l, / ] 

300 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

301""" 

302 

303user_data = """ 

304disk_setup: 

305 ephemeral0: 

306 table_type: mbr 

307 layout: True 

308 overwrite: False 

309runcmd: 

310 - [ ls, -l, / ] 

311 - [ sh, -xc, "echo $(date) '& rm -rf /'" ] 

312""" 

313vdu_id = "bb9c43f9-10a2-4569-a8a8-957c3528b6d1" 

314vnf_id = "665b4165-ce24-4320-bf19-b9a45bade49f" 

315target_vim = "vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

316action_id = "bb937f49-3870-4169-b758-9732e1ff40f3" 

317nsr_id_2 = "993166fe-723e-4680-ac4b-b1af2541ae31" 

318target_record_1 = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

319target_record_id = ( 

320 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:" 

321 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1" 

322) 

323expected_result_vertical_scale = { 

324 "target_id": target_vim, 

325 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

326 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

327 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1", 

328 "status": "SCHEDULED", 

329 "action": "EXEC", 

330 "item": "verticalscale", 

331 "target_record": target_record_1, 

332 "target_record_id": target_record_id, 

333 "params": { 

334 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

335 "flavor_dict": "flavor_dict", 

336 "flavor_id": "TASK-nsrs:993166fe-723e-4680-ac4b-b1af2541ae31:flavor.0", 

337 }, 

338 "depends_on": ["nsrs:993166fe-723e-4680-ac4b-b1af2541ae31:flavor.0"], 

339} 

340vdu = { 

341 "id": vdu_id, 

342 "vim_info": {target_vim: {"interfaces": []}}, 

343 "ns-flavor-id": "0", 

344} 

345vnf = {"_id": vnf_id} 

346extra_dict_vertical_scale = { 

347 "params": { 

348 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

349 "flavor_dict": "flavor_dict", 

350 }, 

351} 

352extra_dict_migrate = { 

353 "params": { 

354 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

355 "migrate_host": "migrateToHost", 

356 }, 

357} 

358expected_result_migrate = { 

359 "target_id": target_vim, 

360 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

361 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

362 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1", 

363 "status": "SCHEDULED", 

364 "action": "EXEC", 

365 "item": "migrate", 

366 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35", 

367 "target_record_id": target_record_id, 

368 "params": { 

369 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

370 "migrate_host": "migrateToHost", 

371 }, 

372} 

373expected_result_rebuild_start_stop = { 

374 "target_id": target_vim, 

375 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

376 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

377 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0", 

378 "status": "SCHEDULED", 

379 "action": "EXEC", 

380 "item": "update", 

381 "target_record_id": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1", 

382} 

383 

384 

385class TestException(Exception): 

386 pass 

387 

388 

389class CopyingMock(MagicMock): 

390 def __call__(self, *args, **kwargs): 

391 args = deepcopy(args) 

392 kwargs = deepcopy(kwargs) 

393 return super(CopyingMock, self).__call__(*args, **kwargs) 

394 

395 

396class TestNs(unittest.TestCase): 

397 def setUp(self): 

398 pass 

399 

400 def test__create_task_without_extra_dict(self): 

401 expected_result = { 

402 "target_id": "vim_openstack_1", 

403 "action_id": "123456", 

404 "nsr_id": "654321", 

405 "task_id": "123456:1", 

406 "status": "SCHEDULED", 

407 "action": "CREATE", 

408 "item": "test_item", 

409 "target_record": "test_target_record", 

410 "target_record_id": "test_target_record_id", 

411 } 

412 deployment_info = { 

413 "action_id": "123456", 

414 "nsr_id": "654321", 

415 "task_index": 1, 

416 } 

417 

418 task = Ns._create_task( 

419 deployment_info=deployment_info, 

420 target_id="vim_openstack_1", 

421 item="test_item", 

422 action="CREATE", 

423 target_record="test_target_record", 

424 target_record_id="test_target_record_id", 

425 ) 

426 

427 self.assertEqual(deployment_info.get("task_index"), 2) 

428 self.assertDictEqual(task, expected_result) 

429 

430 def test__create_task(self): 

431 expected_result = { 

432 "target_id": "vim_openstack_1", 

433 "action_id": "123456", 

434 "nsr_id": "654321", 

435 "task_id": "123456:1", 

436 "status": "SCHEDULED", 

437 "action": "CREATE", 

438 "item": "test_item", 

439 "target_record": "test_target_record", 

440 "target_record_id": "test_target_record_id", 

441 # values coming from extra_dict 

442 "params": "test_params", 

443 "find_params": "test_find_params", 

444 "depends_on": "test_depends_on", 

445 } 

446 deployment_info = { 

447 "action_id": "123456", 

448 "nsr_id": "654321", 

449 "task_index": 1, 

450 } 

451 

452 task = Ns._create_task( 

453 deployment_info=deployment_info, 

454 target_id="vim_openstack_1", 

455 item="test_item", 

456 action="CREATE", 

457 target_record="test_target_record", 

458 target_record_id="test_target_record_id", 

459 extra_dict={ 

460 "params": "test_params", 

461 "find_params": "test_find_params", 

462 "depends_on": "test_depends_on", 

463 }, 

464 ) 

465 

466 self.assertEqual(deployment_info.get("task_index"), 2) 

467 self.assertDictEqual(task, expected_result) 

468 

469 @patch("osm_ng_ro.ns.time") 

470 def test__create_ro_task(self, mock_time: Mock): 

471 now = 1637324838.994551 

472 mock_time.return_value = now 

473 task = { 

474 "target_id": "vim_openstack_1", 

475 "action_id": "123456", 

476 "nsr_id": "654321", 

477 "task_id": "123456:1", 

478 "status": "SCHEDULED", 

479 "action": "CREATE", 

480 "item": "test_item", 

481 "target_record": "test_target_record", 

482 "target_record_id": "test_target_record_id", 

483 # values coming from extra_dict 

484 "params": "test_params", 

485 "find_params": "test_find_params", 

486 "depends_on": "test_depends_on", 

487 } 

488 expected_result = { 

489 "_id": "123456:1", 

490 "locked_by": None, 

491 "locked_at": 0.0, 

492 "target_id": "vim_openstack_1", 

493 "vim_info": { 

494 "created": False, 

495 "created_items": None, 

496 "vim_id": None, 

497 "vim_name": None, 

498 "vim_status": None, 

499 "vim_details": None, 

500 "vim_message": None, 

501 "refresh_at": None, 

502 }, 

503 "modified_at": now, 

504 "created_at": now, 

505 "to_check_at": now, 

506 "tasks": [task], 

507 } 

508 

509 ro_task = Ns._create_ro_task( 

510 target_id="vim_openstack_1", 

511 task=task, 

512 ) 

513 

514 self.assertDictEqual(ro_task, expected_result) 

515 

516 def test__process_image_params_with_empty_target_image(self): 

517 expected_result = { 

518 "find_params": {}, 

519 } 

520 target_image = {} 

521 

522 result = Ns._process_image_params( 

523 target_image=target_image, 

524 indata=None, 

525 vim_info=None, 

526 target_record_id=None, 

527 ) 

528 

529 self.assertDictEqual(expected_result, result) 

530 

531 def test__process_image_params_with_wrong_target_image(self): 

532 expected_result = { 

533 "find_params": {}, 

534 } 

535 target_image = { 

536 "no_image": "to_see_here", 

537 } 

538 

539 result = Ns._process_image_params( 

540 target_image=target_image, 

541 indata=None, 

542 vim_info=None, 

543 target_record_id=None, 

544 ) 

545 

546 self.assertDictEqual(expected_result, result) 

547 

548 def test__process_image_params_with_image(self): 

549 expected_result = { 

550 "find_params": { 

551 "filter_dict": { 

552 "name": "cirros", 

553 }, 

554 }, 

555 } 

556 target_image = { 

557 "image": "cirros", 

558 } 

559 

560 result = Ns._process_image_params( 

561 target_image=target_image, 

562 indata=None, 

563 vim_info=None, 

564 target_record_id=None, 

565 ) 

566 

567 self.assertDictEqual(expected_result, result) 

568 

569 def test__process_image_params_with_vim_image_id(self): 

570 expected_result = { 

571 "find_params": { 

572 "filter_dict": { 

573 "id": "123456", 

574 }, 

575 }, 

576 } 

577 target_image = { 

578 "vim_image_id": "123456", 

579 } 

580 

581 result = Ns._process_image_params( 

582 target_image=target_image, 

583 indata=None, 

584 vim_info=None, 

585 target_record_id=None, 

586 ) 

587 

588 self.assertDictEqual(expected_result, result) 

589 

590 def test__process_image_params_with_image_checksum(self): 

591 expected_result = { 

592 "find_params": { 

593 "filter_dict": { 

594 "checksum": "e3fc50a88d0a364313df4b21ef20c29e", 

595 }, 

596 }, 

597 } 

598 target_image = { 

599 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e", 

600 } 

601 

602 result = Ns._process_image_params( 

603 target_image=target_image, 

604 indata=None, 

605 vim_info=None, 

606 target_record_id=None, 

607 ) 

608 

609 self.assertDictEqual(expected_result, result) 

610 

611 def test__get_resource_allocation_params_with_empty_target_image(self): 

612 expected_result = {} 

613 quota_descriptor = {} 

614 

615 result = Ns._get_resource_allocation_params( 

616 quota_descriptor=quota_descriptor, 

617 ) 

618 

619 self.assertDictEqual(expected_result, result) 

620 

621 def test__get_resource_allocation_params_with_wrong_target_image(self): 

622 expected_result = {} 

623 quota_descriptor = { 

624 "no_quota": "present_here", 

625 } 

626 

627 result = Ns._get_resource_allocation_params( 

628 quota_descriptor=quota_descriptor, 

629 ) 

630 

631 self.assertDictEqual(expected_result, result) 

632 

633 def test__get_resource_allocation_params_with_limit(self): 

634 expected_result = { 

635 "limit": 10, 

636 } 

637 quota_descriptor = { 

638 "limit": "10", 

639 } 

640 

641 result = Ns._get_resource_allocation_params( 

642 quota_descriptor=quota_descriptor, 

643 ) 

644 

645 self.assertDictEqual(expected_result, result) 

646 

647 def test__get_resource_allocation_params_with_reserve(self): 

648 expected_result = { 

649 "reserve": 20, 

650 } 

651 quota_descriptor = { 

652 "reserve": "20", 

653 } 

654 

655 result = Ns._get_resource_allocation_params( 

656 quota_descriptor=quota_descriptor, 

657 ) 

658 

659 self.assertDictEqual(expected_result, result) 

660 

661 def test__get_resource_allocation_params_with_shares(self): 

662 expected_result = { 

663 "shares": 30, 

664 } 

665 quota_descriptor = { 

666 "shares": "30", 

667 } 

668 

669 result = Ns._get_resource_allocation_params( 

670 quota_descriptor=quota_descriptor, 

671 ) 

672 

673 self.assertDictEqual(expected_result, result) 

674 

675 def test__get_resource_allocation_params(self): 

676 expected_result = { 

677 "limit": 10, 

678 "reserve": 20, 

679 "shares": 30, 

680 } 

681 quota_descriptor = { 

682 "limit": "10", 

683 "reserve": "20", 

684 "shares": "30", 

685 } 

686 

687 result = Ns._get_resource_allocation_params( 

688 quota_descriptor=quota_descriptor, 

689 ) 

690 

691 self.assertDictEqual(expected_result, result) 

692 

693 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

694 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu( 

695 self, 

696 resource_allocation, 

697 ): 

698 expected_result = {} 

699 guest_epa_quota = {} 

700 epa_vcpu_set = True 

701 

702 result = Ns._process_guest_epa_quota_params( 

703 guest_epa_quota=guest_epa_quota, 

704 epa_vcpu_set=epa_vcpu_set, 

705 ) 

706 

707 self.assertDictEqual(expected_result, result) 

708 self.assertFalse(resource_allocation.called) 

709 

710 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

711 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu( 

712 self, 

713 resource_allocation, 

714 ): 

715 expected_result = {} 

716 guest_epa_quota = {} 

717 epa_vcpu_set = False 

718 

719 result = Ns._process_guest_epa_quota_params( 

720 guest_epa_quota=guest_epa_quota, 

721 epa_vcpu_set=epa_vcpu_set, 

722 ) 

723 

724 self.assertDictEqual(expected_result, result) 

725 self.assertFalse(resource_allocation.called) 

726 

727 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

728 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu( 

729 self, 

730 resource_allocation, 

731 ): 

732 expected_result = {} 

733 guest_epa_quota = { 

734 "no-quota": "nothing", 

735 } 

736 epa_vcpu_set = True 

737 

738 result = Ns._process_guest_epa_quota_params( 

739 guest_epa_quota=guest_epa_quota, 

740 epa_vcpu_set=epa_vcpu_set, 

741 ) 

742 

743 self.assertDictEqual(expected_result, result) 

744 self.assertFalse(resource_allocation.called) 

745 

746 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

747 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu( 

748 self, 

749 resource_allocation, 

750 ): 

751 expected_result = {} 

752 guest_epa_quota = { 

753 "no-quota": "nothing", 

754 } 

755 epa_vcpu_set = False 

756 

757 result = Ns._process_guest_epa_quota_params( 

758 guest_epa_quota=guest_epa_quota, 

759 epa_vcpu_set=epa_vcpu_set, 

760 ) 

761 

762 self.assertDictEqual(expected_result, result) 

763 self.assertFalse(resource_allocation.called) 

764 

765 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

766 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu( 

767 self, 

768 resource_allocation, 

769 ): 

770 expected_result = {} 

771 guest_epa_quota = { 

772 "cpu-quota": { 

773 "limit": "10", 

774 "reserve": "20", 

775 "shares": "30", 

776 }, 

777 } 

778 epa_vcpu_set = True 

779 

780 result = Ns._process_guest_epa_quota_params( 

781 guest_epa_quota=guest_epa_quota, 

782 epa_vcpu_set=epa_vcpu_set, 

783 ) 

784 

785 self.assertDictEqual(expected_result, result) 

786 self.assertFalse(resource_allocation.called) 

787 

788 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

789 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu( 

790 self, 

791 resource_allocation, 

792 ): 

793 expected_result = { 

794 "cpu-quota": { 

795 "limit": 10, 

796 "reserve": 20, 

797 "shares": 30, 

798 }, 

799 } 

800 guest_epa_quota = { 

801 "cpu-quota": { 

802 "limit": "10", 

803 "reserve": "20", 

804 "shares": "30", 

805 }, 

806 } 

807 epa_vcpu_set = False 

808 

809 resource_allocation_param = { 

810 "limit": "10", 

811 "reserve": "20", 

812 "shares": "30", 

813 } 

814 resource_allocation.return_value = { 

815 "limit": 10, 

816 "reserve": 20, 

817 "shares": 30, 

818 } 

819 

820 result = Ns._process_guest_epa_quota_params( 

821 guest_epa_quota=guest_epa_quota, 

822 epa_vcpu_set=epa_vcpu_set, 

823 ) 

824 

825 resource_allocation.assert_called_once_with(resource_allocation_param) 

826 self.assertDictEqual(expected_result, result) 

827 

828 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

829 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu( 

830 self, 

831 resource_allocation, 

832 ): 

833 expected_result = { 

834 "mem-quota": { 

835 "limit": 10, 

836 "reserve": 20, 

837 "shares": 30, 

838 }, 

839 } 

840 guest_epa_quota = { 

841 "mem-quota": { 

842 "limit": "10", 

843 "reserve": "20", 

844 "shares": "30", 

845 }, 

846 } 

847 epa_vcpu_set = True 

848 

849 resource_allocation_param = { 

850 "limit": "10", 

851 "reserve": "20", 

852 "shares": "30", 

853 } 

854 resource_allocation.return_value = { 

855 "limit": 10, 

856 "reserve": 20, 

857 "shares": 30, 

858 } 

859 

860 result = Ns._process_guest_epa_quota_params( 

861 guest_epa_quota=guest_epa_quota, 

862 epa_vcpu_set=epa_vcpu_set, 

863 ) 

864 

865 resource_allocation.assert_called_once_with(resource_allocation_param) 

866 self.assertDictEqual(expected_result, result) 

867 

868 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

869 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu( 

870 self, 

871 resource_allocation, 

872 ): 

873 expected_result = { 

874 "mem-quota": { 

875 "limit": 10, 

876 "reserve": 20, 

877 "shares": 30, 

878 }, 

879 } 

880 guest_epa_quota = { 

881 "mem-quota": { 

882 "limit": "10", 

883 "reserve": "20", 

884 "shares": "30", 

885 }, 

886 } 

887 epa_vcpu_set = False 

888 

889 resource_allocation_param = { 

890 "limit": "10", 

891 "reserve": "20", 

892 "shares": "30", 

893 } 

894 resource_allocation.return_value = { 

895 "limit": 10, 

896 "reserve": 20, 

897 "shares": 30, 

898 } 

899 

900 result = Ns._process_guest_epa_quota_params( 

901 guest_epa_quota=guest_epa_quota, 

902 epa_vcpu_set=epa_vcpu_set, 

903 ) 

904 

905 resource_allocation.assert_called_once_with(resource_allocation_param) 

906 self.assertDictEqual(expected_result, result) 

907 

908 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

909 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu( 

910 self, 

911 resource_allocation, 

912 ): 

913 expected_result = { 

914 "disk-io-quota": { 

915 "limit": 10, 

916 "reserve": 20, 

917 "shares": 30, 

918 }, 

919 } 

920 guest_epa_quota = { 

921 "disk-io-quota": { 

922 "limit": "10", 

923 "reserve": "20", 

924 "shares": "30", 

925 }, 

926 } 

927 epa_vcpu_set = True 

928 

929 resource_allocation_param = { 

930 "limit": "10", 

931 "reserve": "20", 

932 "shares": "30", 

933 } 

934 resource_allocation.return_value = { 

935 "limit": 10, 

936 "reserve": 20, 

937 "shares": 30, 

938 } 

939 

940 result = Ns._process_guest_epa_quota_params( 

941 guest_epa_quota=guest_epa_quota, 

942 epa_vcpu_set=epa_vcpu_set, 

943 ) 

944 

945 resource_allocation.assert_called_once_with(resource_allocation_param) 

946 self.assertDictEqual(expected_result, result) 

947 

948 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

949 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu( 

950 self, 

951 resource_allocation, 

952 ): 

953 expected_result = { 

954 "disk-io-quota": { 

955 "limit": 10, 

956 "reserve": 20, 

957 "shares": 30, 

958 }, 

959 } 

960 guest_epa_quota = { 

961 "disk-io-quota": { 

962 "limit": "10", 

963 "reserve": "20", 

964 "shares": "30", 

965 }, 

966 } 

967 epa_vcpu_set = False 

968 

969 resource_allocation_param = { 

970 "limit": "10", 

971 "reserve": "20", 

972 "shares": "30", 

973 } 

974 resource_allocation.return_value = { 

975 "limit": 10, 

976 "reserve": 20, 

977 "shares": 30, 

978 } 

979 

980 result = Ns._process_guest_epa_quota_params( 

981 guest_epa_quota=guest_epa_quota, 

982 epa_vcpu_set=epa_vcpu_set, 

983 ) 

984 

985 resource_allocation.assert_called_once_with(resource_allocation_param) 

986 self.assertDictEqual(expected_result, result) 

987 

988 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

989 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu( 

990 self, 

991 resource_allocation, 

992 ): 

993 expected_result = { 

994 "vif-quota": { 

995 "limit": 10, 

996 "reserve": 20, 

997 "shares": 30, 

998 }, 

999 } 

1000 guest_epa_quota = { 

1001 "vif-quota": { 

1002 "limit": "10", 

1003 "reserve": "20", 

1004 "shares": "30", 

1005 }, 

1006 } 

1007 epa_vcpu_set = True 

1008 

1009 resource_allocation_param = { 

1010 "limit": "10", 

1011 "reserve": "20", 

1012 "shares": "30", 

1013 } 

1014 resource_allocation.return_value = { 

1015 "limit": 10, 

1016 "reserve": 20, 

1017 "shares": 30, 

1018 } 

1019 

1020 result = Ns._process_guest_epa_quota_params( 

1021 guest_epa_quota=guest_epa_quota, 

1022 epa_vcpu_set=epa_vcpu_set, 

1023 ) 

1024 

1025 resource_allocation.assert_called_once_with(resource_allocation_param) 

1026 self.assertDictEqual(expected_result, result) 

1027 

1028 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

1029 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu( 

1030 self, 

1031 resource_allocation, 

1032 ): 

1033 expected_result = { 

1034 "vif-quota": { 

1035 "limit": 10, 

1036 "reserve": 20, 

1037 "shares": 30, 

1038 }, 

1039 } 

1040 guest_epa_quota = { 

1041 "vif-quota": { 

1042 "limit": "10", 

1043 "reserve": "20", 

1044 "shares": "30", 

1045 }, 

1046 } 

1047 epa_vcpu_set = False 

1048 

1049 resource_allocation_param = { 

1050 "limit": "10", 

1051 "reserve": "20", 

1052 "shares": "30", 

1053 } 

1054 resource_allocation.return_value = { 

1055 "limit": 10, 

1056 "reserve": 20, 

1057 "shares": 30, 

1058 } 

1059 

1060 result = Ns._process_guest_epa_quota_params( 

1061 guest_epa_quota=guest_epa_quota, 

1062 epa_vcpu_set=epa_vcpu_set, 

1063 ) 

1064 

1065 resource_allocation.assert_called_once_with(resource_allocation_param) 

1066 self.assertDictEqual(expected_result, result) 

1067 

1068 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

1069 def test__process_guest_epa_quota_params_with_quota_epa_cpu( 

1070 self, 

1071 resource_allocation, 

1072 ): 

1073 expected_result = { 

1074 "mem-quota": { 

1075 "limit": 10, 

1076 "reserve": 20, 

1077 "shares": 30, 

1078 }, 

1079 "disk-io-quota": { 

1080 "limit": 10, 

1081 "reserve": 20, 

1082 "shares": 30, 

1083 }, 

1084 "vif-quota": { 

1085 "limit": 10, 

1086 "reserve": 20, 

1087 "shares": 30, 

1088 }, 

1089 } 

1090 guest_epa_quota = { 

1091 "cpu-quota": { 

1092 "limit": "10", 

1093 "reserve": "20", 

1094 "shares": "30", 

1095 }, 

1096 "mem-quota": { 

1097 "limit": "10", 

1098 "reserve": "20", 

1099 "shares": "30", 

1100 }, 

1101 "disk-io-quota": { 

1102 "limit": "10", 

1103 "reserve": "20", 

1104 "shares": "30", 

1105 }, 

1106 "vif-quota": { 

1107 "limit": "10", 

1108 "reserve": "20", 

1109 "shares": "30", 

1110 }, 

1111 } 

1112 epa_vcpu_set = True 

1113 

1114 resource_allocation.return_value = { 

1115 "limit": 10, 

1116 "reserve": 20, 

1117 "shares": 30, 

1118 } 

1119 

1120 result = Ns._process_guest_epa_quota_params( 

1121 guest_epa_quota=guest_epa_quota, 

1122 epa_vcpu_set=epa_vcpu_set, 

1123 ) 

1124 

1125 self.assertTrue(resource_allocation.called) 

1126 self.assertDictEqual(expected_result, result) 

1127 

1128 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

1129 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set( 

1130 self, 

1131 resource_allocation, 

1132 ): 

1133 expected_result = { 

1134 "cpu-quota": { 

1135 "limit": 10, 

1136 "reserve": 20, 

1137 "shares": 30, 

1138 }, 

1139 "mem-quota": { 

1140 "limit": 10, 

1141 "reserve": 20, 

1142 "shares": 30, 

1143 }, 

1144 "disk-io-quota": { 

1145 "limit": 10, 

1146 "reserve": 20, 

1147 "shares": 30, 

1148 }, 

1149 "vif-quota": { 

1150 "limit": 10, 

1151 "reserve": 20, 

1152 "shares": 30, 

1153 }, 

1154 } 

1155 guest_epa_quota = { 

1156 "cpu-quota": { 

1157 "limit": "10", 

1158 "reserve": "20", 

1159 "shares": "30", 

1160 }, 

1161 "mem-quota": { 

1162 "limit": "10", 

1163 "reserve": "20", 

1164 "shares": "30", 

1165 }, 

1166 "disk-io-quota": { 

1167 "limit": "10", 

1168 "reserve": "20", 

1169 "shares": "30", 

1170 }, 

1171 "vif-quota": { 

1172 "limit": "10", 

1173 "reserve": "20", 

1174 "shares": "30", 

1175 }, 

1176 } 

1177 epa_vcpu_set = False 

1178 

1179 resource_allocation.return_value = { 

1180 "limit": 10, 

1181 "reserve": 20, 

1182 "shares": 30, 

1183 } 

1184 

1185 result = Ns._process_guest_epa_quota_params( 

1186 guest_epa_quota=guest_epa_quota, 

1187 epa_vcpu_set=epa_vcpu_set, 

1188 ) 

1189 

1190 self.assertTrue(resource_allocation.called) 

1191 self.assertDictEqual(expected_result, result) 

1192 

1193 def test__process_guest_epa_numa_params_with_empty_numa_params(self): 

1194 expected_numa_result = [] 

1195 expected_epa_vcpu_set_result = False 

1196 guest_epa_quota = {} 

1197 

1198 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1199 guest_epa_quota=guest_epa_quota, 

1200 ) 

1201 self.assertEqual(expected_numa_result, numa_result) 

1202 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1203 

1204 def test__process_guest_epa_numa_params_with_wrong_numa_params(self): 

1205 expected_numa_result = [] 

1206 expected_epa_vcpu_set_result = False 

1207 guest_epa_quota = {"no_nume": "here"} 

1208 

1209 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1210 guest_epa_quota=guest_epa_quota, 

1211 ) 

1212 

1213 self.assertEqual(expected_numa_result, numa_result) 

1214 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1215 

1216 def test__process_guest_epa_numa_params_with_numa_node_policy(self): 

1217 expected_numa_result = [] 

1218 expected_epa_vcpu_set_result = False 

1219 guest_epa_quota = {"numa-node-policy": {}} 

1220 

1221 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1222 guest_epa_quota=guest_epa_quota, 

1223 ) 

1224 

1225 self.assertEqual(expected_numa_result, numa_result) 

1226 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1227 

1228 def test__process_guest_epa_numa_params_with_no_node(self): 

1229 expected_numa_result = [] 

1230 expected_epa_vcpu_set_result = False 

1231 guest_epa_quota = { 

1232 "numa-node-policy": { 

1233 "node": [], 

1234 }, 

1235 } 

1236 

1237 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1238 guest_epa_quota=guest_epa_quota, 

1239 ) 

1240 

1241 self.assertEqual(expected_numa_result, numa_result) 

1242 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1243 

1244 def test__process_guest_epa_numa_params_with_1_node_num_cores(self): 

1245 expected_numa_result = [{"cores": 3}] 

1246 expected_epa_vcpu_set_result = True 

1247 guest_epa_quota = { 

1248 "numa-node-policy": { 

1249 "node": [ 

1250 { 

1251 "num-cores": 3, 

1252 }, 

1253 ], 

1254 }, 

1255 } 

1256 

1257 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1258 guest_epa_quota=guest_epa_quota, 

1259 ) 

1260 

1261 self.assertEqual(expected_numa_result, numa_result) 

1262 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1263 

1264 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self): 

1265 expected_numa_result = [{"paired_threads": 3}] 

1266 expected_epa_vcpu_set_result = True 

1267 guest_epa_quota = { 

1268 "numa-node-policy": { 

1269 "node": [ 

1270 { 

1271 "paired-threads": {"num-paired-threads": "3"}, 

1272 }, 

1273 ], 

1274 }, 

1275 } 

1276 

1277 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1278 guest_epa_quota=guest_epa_quota, 

1279 ) 

1280 

1281 self.assertEqual(expected_numa_result, numa_result) 

1282 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1283 

1284 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self): 

1285 expected_numa_result = [ 

1286 { 

1287 "paired-threads-id": [("0", "1"), ("4", "5")], 

1288 } 

1289 ] 

1290 expected_epa_vcpu_set_result = False 

1291 guest_epa_quota = { 

1292 "numa-node-policy": { 

1293 "node": [ 

1294 { 

1295 "paired-threads": { 

1296 "paired-thread-ids": [ 

1297 { 

1298 "thread-a": 0, 

1299 "thread-b": 1, 

1300 }, 

1301 { 

1302 "thread-a": 4, 

1303 "thread-b": 5, 

1304 }, 

1305 ], 

1306 }, 

1307 }, 

1308 ], 

1309 }, 

1310 } 

1311 

1312 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1313 guest_epa_quota=guest_epa_quota, 

1314 ) 

1315 

1316 self.assertEqual(expected_numa_result, numa_result) 

1317 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1318 

1319 def test__process_guest_epa_numa_params_with_1_node_num_threads(self): 

1320 expected_numa_result = [{"threads": 3}] 

1321 expected_epa_vcpu_set_result = True 

1322 guest_epa_quota = { 

1323 "numa-node-policy": { 

1324 "node": [ 

1325 { 

1326 "num-threads": "3", 

1327 }, 

1328 ], 

1329 }, 

1330 } 

1331 

1332 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1333 guest_epa_quota=guest_epa_quota, 

1334 ) 

1335 

1336 self.assertEqual(expected_numa_result, numa_result) 

1337 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1338 

1339 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self): 

1340 expected_numa_result = [{"memory": 2}] 

1341 expected_epa_vcpu_set_result = False 

1342 guest_epa_quota = { 

1343 "numa-node-policy": { 

1344 "node": [ 

1345 { 

1346 "memory-mb": 2048, 

1347 }, 

1348 ], 

1349 }, 

1350 } 

1351 

1352 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1353 guest_epa_quota=guest_epa_quota, 

1354 ) 

1355 

1356 self.assertEqual(expected_numa_result, numa_result) 

1357 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1358 

1359 def test__process_guest_epa_numa_params_with_1_node_vcpu(self): 

1360 expected_numa_result = [ 

1361 { 

1362 "id": 0, 

1363 "vcpu": [0, 1], 

1364 } 

1365 ] 

1366 expected_epa_vcpu_set_result = False 

1367 guest_epa_quota = { 

1368 "numa-node-policy": { 

1369 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}], 

1370 }, 

1371 } 

1372 

1373 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1374 guest_epa_quota=guest_epa_quota, 

1375 ) 

1376 

1377 self.assertEqual(expected_numa_result, numa_result) 

1378 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1379 

1380 def test__process_guest_epa_numa_params_with_2_node_vcpu(self): 

1381 expected_numa_result = [ 

1382 { 

1383 "id": 0, 

1384 "vcpu": [0, 1], 

1385 }, 

1386 { 

1387 "id": 1, 

1388 "vcpu": [2, 3], 

1389 }, 

1390 ] 

1391 

1392 expected_epa_vcpu_set_result = False 

1393 guest_epa_quota = { 

1394 "numa-node-policy": { 

1395 "node": [ 

1396 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}, 

1397 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]}, 

1398 ], 

1399 }, 

1400 } 

1401 

1402 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1403 guest_epa_quota=guest_epa_quota, 

1404 ) 

1405 

1406 self.assertEqual(expected_numa_result, numa_result) 

1407 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1408 

1409 def test__process_guest_epa_numa_params_with_1_node(self): 

1410 expected_numa_result = [ 

1411 { 

1412 # "id": 0, 

1413 # "vcpu": [0, 1], 

1414 "cores": 3, 

1415 "paired_threads": 3, 

1416 "paired-threads-id": [("0", "1"), ("4", "5")], 

1417 "threads": 3, 

1418 "memory": 2, 

1419 } 

1420 ] 

1421 expected_epa_vcpu_set_result = True 

1422 guest_epa_quota = { 

1423 "numa-node-policy": { 

1424 "node": [ 

1425 { 

1426 "num-cores": 3, 

1427 "paired-threads": { 

1428 "num-paired-threads": "3", 

1429 "paired-thread-ids": [ 

1430 { 

1431 "thread-a": 0, 

1432 "thread-b": 1, 

1433 }, 

1434 { 

1435 "thread-a": 4, 

1436 "thread-b": 5, 

1437 }, 

1438 ], 

1439 }, 

1440 "num-threads": "3", 

1441 "memory-mb": 2048, 

1442 }, 

1443 ], 

1444 }, 

1445 } 

1446 

1447 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1448 guest_epa_quota=guest_epa_quota, 

1449 ) 

1450 

1451 self.assertEqual(expected_numa_result, numa_result) 

1452 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1453 

1454 def test__process_guest_epa_numa_params_with_2_nodes(self): 

1455 expected_numa_result = [ 

1456 { 

1457 "cores": 3, 

1458 "paired_threads": 3, 

1459 "paired-threads-id": [("0", "1"), ("4", "5")], 

1460 "threads": 3, 

1461 "memory": 2, 

1462 }, 

1463 { 

1464 "cores": 7, 

1465 "paired_threads": 7, 

1466 "paired-threads-id": [("2", "3"), ("5", "6")], 

1467 "threads": 4, 

1468 "memory": 4, 

1469 }, 

1470 ] 

1471 expected_epa_vcpu_set_result = True 

1472 guest_epa_quota = { 

1473 "numa-node-policy": { 

1474 "node": [ 

1475 { 

1476 "num-cores": 3, 

1477 "paired-threads": { 

1478 "num-paired-threads": "3", 

1479 "paired-thread-ids": [ 

1480 { 

1481 "thread-a": 0, 

1482 "thread-b": 1, 

1483 }, 

1484 { 

1485 "thread-a": 4, 

1486 "thread-b": 5, 

1487 }, 

1488 ], 

1489 }, 

1490 "num-threads": "3", 

1491 "memory-mb": 2048, 

1492 }, 

1493 { 

1494 "num-cores": 7, 

1495 "paired-threads": { 

1496 "num-paired-threads": "7", 

1497 "paired-thread-ids": [ 

1498 { 

1499 "thread-a": 2, 

1500 "thread-b": 3, 

1501 }, 

1502 { 

1503 "thread-a": 5, 

1504 "thread-b": 6, 

1505 }, 

1506 ], 

1507 }, 

1508 "num-threads": "4", 

1509 "memory-mb": 4096, 

1510 }, 

1511 ], 

1512 }, 

1513 } 

1514 

1515 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1516 guest_epa_quota=guest_epa_quota, 

1517 ) 

1518 

1519 self.assertEqual(expected_numa_result, numa_result) 

1520 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1521 

1522 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self): 

1523 expected_numa_result = {} 

1524 expected_epa_vcpu_set_result = False 

1525 guest_epa_quota = {} 

1526 vcpu_count = 0 

1527 epa_vcpu_set = False 

1528 

1529 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1530 guest_epa_quota=guest_epa_quota, 

1531 vcpu_count=vcpu_count, 

1532 epa_vcpu_set=epa_vcpu_set, 

1533 ) 

1534 

1535 self.assertDictEqual(expected_numa_result, numa_result) 

1536 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1537 

1538 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self): 

1539 expected_numa_result = {} 

1540 expected_epa_vcpu_set_result = False 

1541 guest_epa_quota = { 

1542 "no-cpu-pinning-policy": "here", 

1543 } 

1544 vcpu_count = 0 

1545 epa_vcpu_set = False 

1546 

1547 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1548 guest_epa_quota=guest_epa_quota, 

1549 vcpu_count=vcpu_count, 

1550 epa_vcpu_set=epa_vcpu_set, 

1551 ) 

1552 

1553 self.assertDictEqual(expected_numa_result, numa_result) 

1554 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1555 

1556 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self): 

1557 expected_numa_result = {} 

1558 expected_epa_vcpu_set_result = True 

1559 guest_epa_quota = { 

1560 "cpu-pinning-policy": "DEDICATED", 

1561 } 

1562 vcpu_count = 0 

1563 epa_vcpu_set = True 

1564 

1565 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1566 guest_epa_quota=guest_epa_quota, 

1567 vcpu_count=vcpu_count, 

1568 epa_vcpu_set=epa_vcpu_set, 

1569 ) 

1570 

1571 self.assertDictEqual(expected_numa_result, numa_result) 

1572 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1573 

1574 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self): 

1575 expected_numa_result = {"threads": 3} 

1576 expected_epa_vcpu_set_result = True 

1577 guest_epa_quota = { 

1578 "cpu-pinning-policy": "DEDICATED", 

1579 "cpu-thread-pinning-policy": "PREFER", 

1580 } 

1581 vcpu_count = 3 

1582 epa_vcpu_set = False 

1583 

1584 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1585 guest_epa_quota=guest_epa_quota, 

1586 vcpu_count=vcpu_count, 

1587 epa_vcpu_set=epa_vcpu_set, 

1588 ) 

1589 

1590 self.assertDictEqual(expected_numa_result, numa_result) 

1591 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1592 

1593 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self): 

1594 expected_numa_result = {"cores": 3} 

1595 expected_epa_vcpu_set_result = True 

1596 guest_epa_quota = { 

1597 "cpu-pinning-policy": "DEDICATED", 

1598 "cpu-thread-pinning-policy": "ISOLATE", 

1599 } 

1600 vcpu_count = 3 

1601 epa_vcpu_set = False 

1602 

1603 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1604 guest_epa_quota=guest_epa_quota, 

1605 vcpu_count=vcpu_count, 

1606 epa_vcpu_set=epa_vcpu_set, 

1607 ) 

1608 

1609 self.assertDictEqual(expected_numa_result, numa_result) 

1610 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1611 

1612 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self): 

1613 expected_numa_result = {"threads": 3} 

1614 expected_epa_vcpu_set_result = True 

1615 guest_epa_quota = { 

1616 "cpu-pinning-policy": "DEDICATED", 

1617 "cpu-thread-pinning-policy": "REQUIRE", 

1618 } 

1619 vcpu_count = 3 

1620 epa_vcpu_set = False 

1621 

1622 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1623 guest_epa_quota=guest_epa_quota, 

1624 vcpu_count=vcpu_count, 

1625 epa_vcpu_set=epa_vcpu_set, 

1626 ) 

1627 

1628 self.assertDictEqual(expected_numa_result, numa_result) 

1629 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1630 

1631 def test__process_guest_epa_cpu_pinning_params(self): 

1632 expected_numa_result = {"threads": 3} 

1633 expected_epa_vcpu_set_result = True 

1634 guest_epa_quota = { 

1635 "cpu-pinning-policy": "DEDICATED", 

1636 } 

1637 vcpu_count = 3 

1638 epa_vcpu_set = False 

1639 

1640 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1641 guest_epa_quota=guest_epa_quota, 

1642 vcpu_count=vcpu_count, 

1643 epa_vcpu_set=epa_vcpu_set, 

1644 ) 

1645 

1646 self.assertDictEqual(expected_numa_result, numa_result) 

1647 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1648 

1649 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1650 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1651 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1652 def test__process_guest_epa_params_with_empty_params( 

1653 self, 

1654 guest_epa_numa_params, 

1655 guest_epa_cpu_pinning_params, 

1656 guest_epa_quota_params, 

1657 ): 

1658 expected_result = {} 

1659 target_flavor = {} 

1660 

1661 result = Ns._process_epa_params( 

1662 target_flavor=target_flavor, 

1663 ) 

1664 

1665 self.assertDictEqual(expected_result, result) 

1666 self.assertFalse(guest_epa_numa_params.called) 

1667 self.assertFalse(guest_epa_cpu_pinning_params.called) 

1668 self.assertFalse(guest_epa_quota_params.called) 

1669 

1670 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1671 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1672 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1673 def test__process_guest_epa_params_with_wrong_params( 

1674 self, 

1675 guest_epa_numa_params, 

1676 guest_epa_cpu_pinning_params, 

1677 guest_epa_quota_params, 

1678 ): 

1679 expected_result = {} 

1680 target_flavor = { 

1681 "no-guest-epa": "here", 

1682 } 

1683 

1684 result = Ns._process_epa_params( 

1685 target_flavor=target_flavor, 

1686 ) 

1687 

1688 self.assertDictEqual(expected_result, result) 

1689 self.assertFalse(guest_epa_numa_params.called) 

1690 self.assertFalse(guest_epa_cpu_pinning_params.called) 

1691 self.assertFalse(guest_epa_quota_params.called) 

1692 

1693 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1694 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1695 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1696 def test__process_guest_epa_params( 

1697 self, 

1698 guest_epa_numa_params, 

1699 guest_epa_cpu_pinning_params, 

1700 guest_epa_quota_params, 

1701 ): 

1702 expected_result = { 

1703 "mem-policy": "STRICT", 

1704 } 

1705 target_flavor = { 

1706 "guest-epa": { 

1707 "vcpu-count": 1, 

1708 "numa-node-policy": { 

1709 "mem-policy": "STRICT", 

1710 }, 

1711 }, 

1712 } 

1713 

1714 guest_epa_numa_params.return_value = ({}, False) 

1715 guest_epa_cpu_pinning_params.return_value = ({}, False) 

1716 guest_epa_quota_params.return_value = {} 

1717 

1718 result = Ns._process_epa_params( 

1719 target_flavor=target_flavor, 

1720 ) 

1721 

1722 self.assertDictEqual(expected_result, result) 

1723 self.assertTrue(guest_epa_numa_params.called) 

1724 self.assertTrue(guest_epa_cpu_pinning_params.called) 

1725 self.assertTrue(guest_epa_quota_params.called) 

1726 

1727 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1728 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1729 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1730 def test__process_guest_epa_params_with_mempage_size( 

1731 self, 

1732 guest_epa_numa_params, 

1733 guest_epa_cpu_pinning_params, 

1734 guest_epa_quota_params, 

1735 ): 

1736 expected_result = { 

1737 "mempage-size": "1G", 

1738 "mem-policy": "STRICT", 

1739 } 

1740 target_flavor = { 

1741 "guest-epa": { 

1742 "vcpu-count": 1, 

1743 "mempage-size": "1G", 

1744 "numa-node-policy": { 

1745 "mem-policy": "STRICT", 

1746 }, 

1747 }, 

1748 } 

1749 

1750 guest_epa_numa_params.return_value = ({}, False) 

1751 guest_epa_cpu_pinning_params.return_value = ({}, False) 

1752 guest_epa_quota_params.return_value = {} 

1753 

1754 result = Ns._process_epa_params( 

1755 target_flavor=target_flavor, 

1756 ) 

1757 

1758 self.assertDictEqual(expected_result, result) 

1759 self.assertTrue(guest_epa_numa_params.called) 

1760 self.assertTrue(guest_epa_cpu_pinning_params.called) 

1761 self.assertTrue(guest_epa_quota_params.called) 

1762 

1763 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1764 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1765 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1766 def test__process_guest_epa_params_with_numa( 

1767 self, 

1768 guest_epa_numa_params, 

1769 guest_epa_cpu_pinning_params, 

1770 guest_epa_quota_params, 

1771 ): 

1772 expected_result = { 

1773 "mempage-size": "1G", 

1774 "cpu-pinning-policy": "DEDICATED", 

1775 "cpu-thread-pinning-policy": "PREFER", 

1776 "numas": [ 

1777 { 

1778 "cores": 3, 

1779 "memory": 2, 

1780 "paired-threads": 3, 

1781 "paired-threads-id": [("0", "1"), ("4", "5")], 

1782 "threads": 3, 

1783 } 

1784 ], 

1785 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1786 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1787 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1788 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1789 } 

1790 target_flavor = { 

1791 "guest-epa": { 

1792 "vcpu-count": 1, 

1793 "mempage-size": "1G", 

1794 "cpu-pinning-policy": "DEDICATED", 

1795 "cpu-thread-pinning-policy": "PREFER", 

1796 "numa-node-policy": { 

1797 "node": [ 

1798 { 

1799 "num-cores": 3, 

1800 "paired-threads": { 

1801 "num-paired-threads": "3", 

1802 "paired-thread-ids": [ 

1803 { 

1804 "thread-a": 0, 

1805 "thread-b": 1, 

1806 }, 

1807 { 

1808 "thread-a": 4, 

1809 "thread-b": 5, 

1810 }, 

1811 ], 

1812 }, 

1813 "num-threads": "3", 

1814 "memory-mb": 2048, 

1815 }, 

1816 ], 

1817 }, 

1818 "cpu-quota": { 

1819 "limit": "10", 

1820 "reserve": "20", 

1821 "shares": "30", 

1822 }, 

1823 "mem-quota": { 

1824 "limit": "10", 

1825 "reserve": "20", 

1826 "shares": "30", 

1827 }, 

1828 "disk-io-quota": { 

1829 "limit": "10", 

1830 "reserve": "20", 

1831 "shares": "30", 

1832 }, 

1833 "vif-quota": { 

1834 "limit": "10", 

1835 "reserve": "20", 

1836 "shares": "30", 

1837 }, 

1838 }, 

1839 } 

1840 

1841 guest_epa_numa_params.return_value = ( 

1842 [ 

1843 { 

1844 "cores": 3, 

1845 "paired-threads": 3, 

1846 "paired-threads-id": [("0", "1"), ("4", "5")], 

1847 "threads": 3, 

1848 "memory": 2, 

1849 }, 

1850 ], 

1851 True, 

1852 ) 

1853 guest_epa_cpu_pinning_params.return_value = ( 

1854 { 

1855 "threads": 3, 

1856 }, 

1857 True, 

1858 ) 

1859 guest_epa_quota_params.return_value = { 

1860 "cpu-quota": { 

1861 "limit": 10, 

1862 "reserve": 20, 

1863 "shares": 30, 

1864 }, 

1865 "mem-quota": { 

1866 "limit": 10, 

1867 "reserve": 20, 

1868 "shares": 30, 

1869 }, 

1870 "disk-io-quota": { 

1871 "limit": 10, 

1872 "reserve": 20, 

1873 "shares": 30, 

1874 }, 

1875 "vif-quota": { 

1876 "limit": 10, 

1877 "reserve": 20, 

1878 "shares": 30, 

1879 }, 

1880 } 

1881 

1882 result = Ns._process_epa_params( 

1883 target_flavor=target_flavor, 

1884 ) 

1885 self.assertEqual(expected_result, result) 

1886 self.assertTrue(guest_epa_numa_params.called) 

1887 self.assertTrue(guest_epa_cpu_pinning_params.called) 

1888 self.assertTrue(guest_epa_quota_params.called) 

1889 

1890 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1891 def test__process_flavor_params_with_empty_target_flavor( 

1892 self, 

1893 epa_params, 

1894 ): 

1895 target_flavor = {} 

1896 indata = { 

1897 "vnf": [ 

1898 { 

1899 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

1900 }, 

1901 ], 

1902 } 

1903 vim_info = {} 

1904 target_record_id = "" 

1905 

1906 with self.assertRaises(KeyError): 

1907 Ns._process_flavor_params( 

1908 target_flavor=target_flavor, 

1909 indata=indata, 

1910 vim_info=vim_info, 

1911 target_record_id=target_record_id, 

1912 ) 

1913 

1914 self.assertFalse(epa_params.called) 

1915 

1916 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1917 def test__process_flavor_params_with_wrong_target_flavor( 

1918 self, 

1919 epa_params, 

1920 ): 

1921 target_flavor = { 

1922 "no-target-flavor": "here", 

1923 } 

1924 indata = {} 

1925 vim_info = {} 

1926 target_record_id = "" 

1927 

1928 with self.assertRaises(KeyError): 

1929 Ns._process_flavor_params( 

1930 target_flavor=target_flavor, 

1931 indata=indata, 

1932 vim_info=vim_info, 

1933 target_record_id=target_record_id, 

1934 ) 

1935 

1936 self.assertFalse(epa_params.called) 

1937 

1938 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1939 def test__process_flavor_params_with_empty_indata( 

1940 self, 

1941 epa_params, 

1942 ): 

1943 expected_result = { 

1944 "find_params": { 

1945 "flavor_data": { 

1946 "disk": 10, 

1947 "ram": 1024, 

1948 "vcpus": 2, 

1949 }, 

1950 }, 

1951 "params": { 

1952 "flavor_data": { 

1953 "disk": 10, 

1954 "name": "test", 

1955 "ram": 1024, 

1956 "vcpus": 2, 

1957 }, 

1958 }, 

1959 } 

1960 target_flavor = { 

1961 "name": "test", 

1962 "storage-gb": "10", 

1963 "memory-mb": "1024", 

1964 "vcpu-count": "2", 

1965 } 

1966 indata = {} 

1967 vim_info = {} 

1968 target_record_id = "" 

1969 

1970 epa_params.return_value = {} 

1971 

1972 result = Ns._process_flavor_params( 

1973 target_flavor=target_flavor, 

1974 indata=indata, 

1975 vim_info=vim_info, 

1976 target_record_id=target_record_id, 

1977 ) 

1978 

1979 self.assertTrue(epa_params.called) 

1980 self.assertDictEqual(result, expected_result) 

1981 

1982 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1983 def test__process_flavor_params_with_wrong_indata( 

1984 self, 

1985 epa_params, 

1986 ): 

1987 expected_result = { 

1988 "find_params": { 

1989 "flavor_data": { 

1990 "disk": 10, 

1991 "ram": 1024, 

1992 "vcpus": 2, 

1993 }, 

1994 }, 

1995 "params": { 

1996 "flavor_data": { 

1997 "disk": 10, 

1998 "name": "test", 

1999 "ram": 1024, 

2000 "vcpus": 2, 

2001 }, 

2002 }, 

2003 } 

2004 target_flavor = { 

2005 "name": "test", 

2006 "storage-gb": "10", 

2007 "memory-mb": "1024", 

2008 "vcpu-count": "2", 

2009 } 

2010 indata = { 

2011 "no-vnf": "here", 

2012 } 

2013 vim_info = {} 

2014 target_record_id = "" 

2015 

2016 epa_params.return_value = {} 

2017 

2018 result = Ns._process_flavor_params( 

2019 target_flavor=target_flavor, 

2020 indata=indata, 

2021 vim_info=vim_info, 

2022 target_record_id=target_record_id, 

2023 ) 

2024 

2025 self.assertTrue(epa_params.called) 

2026 self.assertDictEqual(result, expected_result) 

2027 

2028 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2029 def test__process_flavor_params_with_ephemeral_disk( 

2030 self, 

2031 epa_params, 

2032 ): 

2033 kwargs = { 

2034 "db": db, 

2035 } 

2036 

2037 db.get_one.return_value = { 

2038 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2039 "df": [ 

2040 { 

2041 "id": "default-df", 

2042 "vdu-profile": [ 

2043 {"id": "without_volumes-VM", "min-number-of-instances": 1} 

2044 ], 

2045 } 

2046 ], 

2047 "id": "without_volumes-vnf", 

2048 "product-name": "without_volumes-vnf", 

2049 "vdu": [ 

2050 { 

2051 "id": "without_volumes-VM", 

2052 "name": "without_volumes-VM", 

2053 "sw-image-desc": "ubuntu20.04", 

2054 "alternative-sw-image-desc": [ 

2055 "ubuntu20.04-aws", 

2056 "ubuntu20.04-azure", 

2057 ], 

2058 "virtual-storage-desc": ["root-volume", "ephemeral-volume"], 

2059 } 

2060 ], 

2061 "version": "1.0", 

2062 "virtual-storage-desc": [ 

2063 {"id": "root-volume", "size-of-storage": "10"}, 

2064 { 

2065 "id": "ephemeral-volume", 

2066 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2067 "size-of-storage": "1", 

2068 }, 

2069 ], 

2070 "_admin": { 

2071 "storage": { 

2072 "fs": "mongo", 

2073 "path": "/app/storage/", 

2074 }, 

2075 "type": "vnfd", 

2076 }, 

2077 } 

2078 expected_result = { 

2079 "find_params": { 

2080 "flavor_data": { 

2081 "disk": 10, 

2082 "ram": 1024, 

2083 "vcpus": 2, 

2084 "ephemeral": 10, 

2085 }, 

2086 }, 

2087 "params": { 

2088 "flavor_data": { 

2089 "disk": 10, 

2090 "name": "test", 

2091 "ram": 1024, 

2092 "vcpus": 2, 

2093 "ephemeral": 10, 

2094 }, 

2095 }, 

2096 } 

2097 target_flavor = { 

2098 "id": "test_id", 

2099 "name": "test", 

2100 "storage-gb": "10", 

2101 "memory-mb": "1024", 

2102 "vcpu-count": "2", 

2103 } 

2104 indata = { 

2105 "vnf": [ 

2106 { 

2107 "vdur": [ 

2108 { 

2109 "ns-flavor-id": "test_id", 

2110 "virtual-storages": [ 

2111 { 

2112 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2113 "size-of-storage": "10", 

2114 }, 

2115 ], 

2116 }, 

2117 ], 

2118 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2119 }, 

2120 ], 

2121 } 

2122 vim_info = {} 

2123 target_record_id = "" 

2124 

2125 epa_params.return_value = {} 

2126 

2127 result = Ns._process_flavor_params( 

2128 target_flavor=target_flavor, 

2129 indata=indata, 

2130 vim_info=vim_info, 

2131 target_record_id=target_record_id, 

2132 **kwargs, 

2133 ) 

2134 

2135 self.assertTrue(epa_params.called) 

2136 self.assertDictEqual(result, expected_result) 

2137 

2138 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2139 def test__process_flavor_params_with_swap_disk( 

2140 self, 

2141 epa_params, 

2142 ): 

2143 expected_result = { 

2144 "find_params": { 

2145 "flavor_data": { 

2146 "disk": 10, 

2147 "ram": 1024, 

2148 "vcpus": 2, 

2149 "swap": 20, 

2150 }, 

2151 }, 

2152 "params": { 

2153 "flavor_data": { 

2154 "disk": 10, 

2155 "name": "test", 

2156 "ram": 1024, 

2157 "vcpus": 2, 

2158 "swap": 20, 

2159 }, 

2160 }, 

2161 } 

2162 target_flavor = { 

2163 "id": "test_id", 

2164 "name": "test", 

2165 "storage-gb": "10", 

2166 "memory-mb": "1024", 

2167 "vcpu-count": "2", 

2168 } 

2169 indata = { 

2170 "vnf": [ 

2171 { 

2172 "vdur": [ 

2173 { 

2174 "ns-flavor-id": "test_id", 

2175 "virtual-storages": [ 

2176 { 

2177 "type-of-storage": "etsi-nfv-descriptors:swap-storage", 

2178 "size-of-storage": "20", 

2179 }, 

2180 ], 

2181 }, 

2182 ], 

2183 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2184 }, 

2185 ], 

2186 } 

2187 vim_info = {} 

2188 target_record_id = "" 

2189 

2190 epa_params.return_value = {} 

2191 

2192 result = Ns._process_flavor_params( 

2193 target_flavor=target_flavor, 

2194 indata=indata, 

2195 vim_info=vim_info, 

2196 target_record_id=target_record_id, 

2197 ) 

2198 

2199 self.assertTrue(epa_params.called) 

2200 self.assertDictEqual(result, expected_result) 

2201 

2202 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2203 def test__process_flavor_params_with_persistent_root_disk( 

2204 self, 

2205 epa_params, 

2206 ): 

2207 kwargs = { 

2208 "db": db, 

2209 } 

2210 

2211 db.get_one.return_value = { 

2212 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2213 "df": [ 

2214 { 

2215 "id": "default-df", 

2216 "vdu-profile": [ 

2217 {"id": "several_volumes-VM", "min-number-of-instances": 1} 

2218 ], 

2219 } 

2220 ], 

2221 "id": "several_volumes-vnf", 

2222 "product-name": "several_volumes-vnf", 

2223 "vdu": [ 

2224 { 

2225 "id": "several_volumes-VM", 

2226 "name": "several_volumes-VM", 

2227 "sw-image-desc": "ubuntu20.04", 

2228 "alternative-sw-image-desc": [ 

2229 "ubuntu20.04-aws", 

2230 "ubuntu20.04-azure", 

2231 ], 

2232 "virtual-storage-desc": [ 

2233 "persistent-root-volume", 

2234 ], 

2235 } 

2236 ], 

2237 "version": "1.0", 

2238 "virtual-storage-desc": [ 

2239 { 

2240 "id": "persistent-root-volume", 

2241 "type-of-storage": "persistent-storage:persistent-storage", 

2242 "size-of-storage": "10", 

2243 }, 

2244 ], 

2245 "_admin": { 

2246 "storage": { 

2247 "fs": "mongo", 

2248 "path": "/app/storage/", 

2249 }, 

2250 "type": "vnfd", 

2251 }, 

2252 } 

2253 expected_result = { 

2254 "find_params": { 

2255 "flavor_data": { 

2256 "disk": 0, 

2257 "ram": 1024, 

2258 "vcpus": 2, 

2259 }, 

2260 }, 

2261 "params": { 

2262 "flavor_data": { 

2263 "disk": 0, 

2264 "name": "test", 

2265 "ram": 1024, 

2266 "vcpus": 2, 

2267 }, 

2268 }, 

2269 } 

2270 target_flavor = { 

2271 "id": "test_id", 

2272 "name": "test", 

2273 "storage-gb": "10", 

2274 "memory-mb": "1024", 

2275 "vcpu-count": "2", 

2276 } 

2277 indata = { 

2278 "vnf": [ 

2279 { 

2280 "vdur": [ 

2281 { 

2282 "vdu-name": "several_volumes-VM", 

2283 "ns-flavor-id": "test_id", 

2284 "virtual-storages": [ 

2285 { 

2286 "type-of-storage": "persistent-storage:persistent-storage", 

2287 "size-of-storage": "10", 

2288 }, 

2289 ], 

2290 }, 

2291 ], 

2292 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2293 }, 

2294 ], 

2295 } 

2296 vim_info = {} 

2297 target_record_id = "" 

2298 

2299 epa_params.return_value = {} 

2300 

2301 result = Ns._process_flavor_params( 

2302 target_flavor=target_flavor, 

2303 indata=indata, 

2304 vim_info=vim_info, 

2305 target_record_id=target_record_id, 

2306 **kwargs, 

2307 ) 

2308 

2309 self.assertTrue(epa_params.called) 

2310 self.assertDictEqual(result, expected_result) 

2311 

2312 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2313 def test__process_flavor_params_with_epa_params( 

2314 self, 

2315 epa_params, 

2316 ): 

2317 expected_result = { 

2318 "find_params": { 

2319 "flavor_data": { 

2320 "disk": 10, 

2321 "ram": 1024, 

2322 "vcpus": 2, 

2323 "extended": { 

2324 "numa": "there-is-numa-here", 

2325 }, 

2326 }, 

2327 }, 

2328 "params": { 

2329 "flavor_data": { 

2330 "disk": 10, 

2331 "name": "test", 

2332 "ram": 1024, 

2333 "vcpus": 2, 

2334 "extended": { 

2335 "numa": "there-is-numa-here", 

2336 }, 

2337 }, 

2338 }, 

2339 } 

2340 target_flavor = { 

2341 "id": "test_id", 

2342 "name": "test", 

2343 "storage-gb": "10", 

2344 "memory-mb": "1024", 

2345 "vcpu-count": "2", 

2346 } 

2347 indata = { 

2348 "vnf": [ 

2349 { 

2350 "vdur": [ 

2351 { 

2352 "ns-flavor-id": "test_id", 

2353 }, 

2354 ], 

2355 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2356 }, 

2357 ], 

2358 } 

2359 vim_info = {} 

2360 target_record_id = "" 

2361 

2362 epa_params.return_value = { 

2363 "numa": "there-is-numa-here", 

2364 } 

2365 

2366 result = Ns._process_flavor_params( 

2367 target_flavor=target_flavor, 

2368 indata=indata, 

2369 vim_info=vim_info, 

2370 target_record_id=target_record_id, 

2371 ) 

2372 

2373 self.assertTrue(epa_params.called) 

2374 self.assertDictEqual(result, expected_result) 

2375 

2376 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2377 def test__process_flavor_params_with_vim_flavor_id( 

2378 self, 

2379 epa_params, 

2380 ): 

2381 expected_result = { 

2382 "find_params": { 

2383 "vim_flavor_id": "test.flavor", 

2384 }, 

2385 } 

2386 target_flavor = { 

2387 "id": "test_id", 

2388 "name": "test", 

2389 "storage-gb": "10", 

2390 "memory-mb": "1024", 

2391 "vcpu-count": "2", 

2392 } 

2393 indata = { 

2394 "vnf": [ 

2395 { 

2396 "vdur": [ 

2397 { 

2398 "ns-flavor-id": "test_id", 

2399 "additionalParams": { 

2400 "OSM": {"vim_flavor_id": "test.flavor"} 

2401 }, 

2402 }, 

2403 ], 

2404 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2405 }, 

2406 ], 

2407 } 

2408 vim_info = {} 

2409 target_record_id = "" 

2410 

2411 epa_params.return_value = {} 

2412 

2413 result = Ns._process_flavor_params( 

2414 target_flavor=target_flavor, 

2415 indata=indata, 

2416 vim_info=vim_info, 

2417 target_record_id=target_record_id, 

2418 ) 

2419 

2420 self.assertFalse(epa_params.called) 

2421 self.assertDictEqual(result, expected_result) 

2422 

2423 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2424 def test__process_flavor_params( 

2425 self, 

2426 epa_params, 

2427 ): 

2428 kwargs = { 

2429 "db": db, 

2430 } 

2431 

2432 db.get_one.return_value = { 

2433 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2434 "df": [ 

2435 { 

2436 "id": "default-df", 

2437 "vdu-profile": [ 

2438 {"id": "without_volumes-VM", "min-number-of-instances": 1} 

2439 ], 

2440 } 

2441 ], 

2442 "id": "without_volumes-vnf", 

2443 "product-name": "without_volumes-vnf", 

2444 "vdu": [ 

2445 { 

2446 "id": "without_volumes-VM", 

2447 "name": "without_volumes-VM", 

2448 "sw-image-desc": "ubuntu20.04", 

2449 "alternative-sw-image-desc": [ 

2450 "ubuntu20.04-aws", 

2451 "ubuntu20.04-azure", 

2452 ], 

2453 "virtual-storage-desc": ["root-volume", "ephemeral-volume"], 

2454 } 

2455 ], 

2456 "version": "1.0", 

2457 "virtual-storage-desc": [ 

2458 {"id": "root-volume", "size-of-storage": "10"}, 

2459 { 

2460 "id": "ephemeral-volume", 

2461 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2462 "size-of-storage": "1", 

2463 }, 

2464 ], 

2465 "_admin": { 

2466 "storage": { 

2467 "fs": "mongo", 

2468 "path": "/app/storage/", 

2469 }, 

2470 "type": "vnfd", 

2471 }, 

2472 } 

2473 

2474 expected_result = { 

2475 "find_params": { 

2476 "flavor_data": { 

2477 "disk": 10, 

2478 "ram": 1024, 

2479 "vcpus": 2, 

2480 "ephemeral": 10, 

2481 "swap": 20, 

2482 "extended": { 

2483 "numa": "there-is-numa-here", 

2484 }, 

2485 }, 

2486 }, 

2487 "params": { 

2488 "flavor_data": { 

2489 "disk": 10, 

2490 "name": "test", 

2491 "ram": 1024, 

2492 "vcpus": 2, 

2493 "ephemeral": 10, 

2494 "swap": 20, 

2495 "extended": { 

2496 "numa": "there-is-numa-here", 

2497 }, 

2498 }, 

2499 }, 

2500 } 

2501 target_flavor = { 

2502 "id": "test_id", 

2503 "name": "test", 

2504 "storage-gb": "10", 

2505 "memory-mb": "1024", 

2506 "vcpu-count": "2", 

2507 } 

2508 indata = { 

2509 "vnf": [ 

2510 { 

2511 "vdur": [ 

2512 { 

2513 "ns-flavor-id": "test_id", 

2514 "virtual-storages": [ 

2515 { 

2516 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2517 "size-of-storage": "10", 

2518 }, 

2519 { 

2520 "type-of-storage": "etsi-nfv-descriptors:swap-storage", 

2521 "size-of-storage": "20", 

2522 }, 

2523 ], 

2524 }, 

2525 ], 

2526 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2527 }, 

2528 ], 

2529 } 

2530 vim_info = {} 

2531 target_record_id = "" 

2532 

2533 epa_params.return_value = { 

2534 "numa": "there-is-numa-here", 

2535 } 

2536 

2537 result = Ns._process_flavor_params( 

2538 target_flavor=target_flavor, 

2539 indata=indata, 

2540 vim_info=vim_info, 

2541 target_record_id=target_record_id, 

2542 **kwargs, 

2543 ) 

2544 

2545 self.assertTrue(epa_params.called) 

2546 self.assertDictEqual(result, expected_result) 

2547 

2548 def test__process_net_params_with_empty_params( 

2549 self, 

2550 ): 

2551 target_vld = { 

2552 "name": "vld-name", 

2553 } 

2554 indata = { 

2555 "name": "ns-name", 

2556 } 

2557 vim_info = { 

2558 "provider_network": "some-profile-here", 

2559 "ip_profile": { 

2560 "some_ip_profile": "here", 

2561 }, 

2562 } 

2563 target_record_id = "" 

2564 expected_result = { 

2565 "params": { 

2566 "net_name": "ns-name-vld-name", 

2567 "net_type": "bridge", 

2568 "ip_profile": { 

2569 "some_ip_profile": "here", 

2570 }, 

2571 "provider_network_profile": "some-profile-here", 

2572 } 

2573 } 

2574 

2575 result = Ns._process_net_params( 

2576 target_vld=target_vld, 

2577 indata=indata, 

2578 vim_info=vim_info, 

2579 target_record_id=target_record_id, 

2580 ) 

2581 

2582 self.assertDictEqual(expected_result, result) 

2583 

2584 def test__process_net_params_with_vim_info_sdn( 

2585 self, 

2586 ): 

2587 target_vld = { 

2588 "name": "vld-name", 

2589 } 

2590 indata = { 

2591 "name": "ns-name", 

2592 } 

2593 vim_info = { 

2594 "sdn": "some-sdn", 

2595 "sdn-ports": ["some", "ports", "here"], 

2596 "vlds": ["some", "vlds", "here"], 

2597 "type": "sdn-type", 

2598 } 

2599 target_record_id = "vld.sdn.something" 

2600 expected_result = { 

2601 "params": { 

2602 "sdn-ports": ["some", "ports", "here"], 

2603 "vlds": ["some", "vlds", "here"], 

2604 "type": "sdn-type", 

2605 } 

2606 } 

2607 

2608 result = Ns._process_net_params( 

2609 target_vld=target_vld, 

2610 indata=indata, 

2611 vim_info=vim_info, 

2612 target_record_id=target_record_id, 

2613 ) 

2614 

2615 self.assertDictEqual(expected_result, result) 

2616 

2617 def test__process_net_params_with_vim_info_sdn_target_vim( 

2618 self, 

2619 ): 

2620 target_vld = { 

2621 "name": "vld-name", 

2622 } 

2623 indata = { 

2624 "name": "ns-name", 

2625 } 

2626 vim_info = { 

2627 "sdn": "some-sdn", 

2628 "sdn-ports": ["some", "ports", "here"], 

2629 "vlds": ["some", "vlds", "here"], 

2630 "target_vim": "some-vim", 

2631 "type": "sdn-type", 

2632 } 

2633 target_record_id = "vld.sdn.something" 

2634 expected_result = { 

2635 "depends_on": ["some-vim vld.sdn"], 

2636 "params": { 

2637 "sdn-ports": ["some", "ports", "here"], 

2638 "vlds": ["some", "vlds", "here"], 

2639 "target_vim": "some-vim", 

2640 "type": "sdn-type", 

2641 }, 

2642 } 

2643 

2644 result = Ns._process_net_params( 

2645 target_vld=target_vld, 

2646 indata=indata, 

2647 vim_info=vim_info, 

2648 target_record_id=target_record_id, 

2649 ) 

2650 

2651 self.assertDictEqual(expected_result, result) 

2652 

2653 def test__process_net_params_with_vim_network_name( 

2654 self, 

2655 ): 

2656 target_vld = { 

2657 "name": "vld-name", 

2658 } 

2659 indata = { 

2660 "name": "ns-name", 

2661 } 

2662 vim_info = { 

2663 "vim_network_name": "some-network-name", 

2664 } 

2665 target_record_id = "vld.sdn.something" 

2666 expected_result = { 

2667 "find_params": { 

2668 "filter_dict": { 

2669 "name": "some-network-name", 

2670 }, 

2671 }, 

2672 } 

2673 

2674 result = Ns._process_net_params( 

2675 target_vld=target_vld, 

2676 indata=indata, 

2677 vim_info=vim_info, 

2678 target_record_id=target_record_id, 

2679 ) 

2680 

2681 self.assertDictEqual(expected_result, result) 

2682 

2683 def test__process_net_params_with_vim_network_id( 

2684 self, 

2685 ): 

2686 target_vld = { 

2687 "name": "vld-name", 

2688 } 

2689 indata = { 

2690 "name": "ns-name", 

2691 } 

2692 vim_info = { 

2693 "vim_network_id": "some-network-id", 

2694 } 

2695 target_record_id = "vld.sdn.something" 

2696 expected_result = { 

2697 "find_params": { 

2698 "filter_dict": { 

2699 "id": "some-network-id", 

2700 }, 

2701 }, 

2702 } 

2703 

2704 result = Ns._process_net_params( 

2705 target_vld=target_vld, 

2706 indata=indata, 

2707 vim_info=vim_info, 

2708 target_record_id=target_record_id, 

2709 ) 

2710 

2711 self.assertDictEqual(expected_result, result) 

2712 

2713 def test__process_net_params_with_mgmt_network( 

2714 self, 

2715 ): 

2716 target_vld = { 

2717 "id": "vld-id", 

2718 "name": "vld-name", 

2719 "mgmt-network": "some-mgmt-network", 

2720 } 

2721 indata = { 

2722 "name": "ns-name", 

2723 } 

2724 vim_info = {} 

2725 target_record_id = "vld.sdn.something" 

2726 expected_result = { 

2727 "find_params": { 

2728 "mgmt": True, 

2729 "name": "vld-id", 

2730 }, 

2731 } 

2732 

2733 result = Ns._process_net_params( 

2734 target_vld=target_vld, 

2735 indata=indata, 

2736 vim_info=vim_info, 

2737 target_record_id=target_record_id, 

2738 ) 

2739 

2740 self.assertDictEqual(expected_result, result) 

2741 

2742 def test__process_net_params_with_underlay_eline( 

2743 self, 

2744 ): 

2745 target_vld = { 

2746 "name": "vld-name", 

2747 "underlay": "some-underlay-here", 

2748 "type": "ELINE", 

2749 } 

2750 indata = { 

2751 "name": "ns-name", 

2752 } 

2753 vim_info = { 

2754 "provider_network": "some-profile-here", 

2755 "ip_profile": { 

2756 "some_ip_profile": "here", 

2757 }, 

2758 } 

2759 target_record_id = "" 

2760 expected_result = { 

2761 "params": { 

2762 "ip_profile": { 

2763 "some_ip_profile": "here", 

2764 }, 

2765 "net_name": "ns-name-vld-name", 

2766 "net_type": "ptp", 

2767 "provider_network_profile": "some-profile-here", 

2768 } 

2769 } 

2770 

2771 result = Ns._process_net_params( 

2772 target_vld=target_vld, 

2773 indata=indata, 

2774 vim_info=vim_info, 

2775 target_record_id=target_record_id, 

2776 ) 

2777 

2778 self.assertDictEqual(expected_result, result) 

2779 

2780 def test__process_net_params_with_underlay_elan( 

2781 self, 

2782 ): 

2783 target_vld = { 

2784 "name": "vld-name", 

2785 "underlay": "some-underlay-here", 

2786 "type": "ELAN", 

2787 } 

2788 indata = { 

2789 "name": "ns-name", 

2790 } 

2791 vim_info = { 

2792 "provider_network": "some-profile-here", 

2793 "ip_profile": { 

2794 "some_ip_profile": "here", 

2795 }, 

2796 } 

2797 target_record_id = "" 

2798 expected_result = { 

2799 "params": { 

2800 "ip_profile": { 

2801 "some_ip_profile": "here", 

2802 }, 

2803 "net_name": "ns-name-vld-name", 

2804 "net_type": "data", 

2805 "provider_network_profile": "some-profile-here", 

2806 } 

2807 } 

2808 

2809 result = Ns._process_net_params( 

2810 target_vld=target_vld, 

2811 indata=indata, 

2812 vim_info=vim_info, 

2813 target_record_id=target_record_id, 

2814 ) 

2815 

2816 self.assertDictEqual(expected_result, result) 

2817 

2818 def test__get_cloud_init_exception(self): 

2819 db_mock = MagicMock(name="database mock") 

2820 fs_mock = None 

2821 

2822 location = "" 

2823 

2824 with self.assertRaises(NsException): 

2825 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2826 

2827 def test__get_cloud_init_file_fs_exception(self): 

2828 db_mock = MagicMock(name="database mock") 

2829 fs_mock = None 

2830 

2831 location = "vnfr_id_123456:file:test_file" 

2832 db_mock.get_one.return_value = { 

2833 "_admin": { 

2834 "storage": { 

2835 "folder": "/home/osm", 

2836 "pkg-dir": "vnfr_test_dir", 

2837 }, 

2838 }, 

2839 } 

2840 

2841 with self.assertRaises(NsException): 

2842 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2843 

2844 def test__get_cloud_init_file(self): 

2845 db_mock = MagicMock(name="database mock") 

2846 fs_mock = MagicMock(name="filesystem mock") 

2847 file_mock = MagicMock(name="file mock") 

2848 

2849 location = "vnfr_id_123456:file:test_file" 

2850 cloud_init_content = "this is a cloud init file content" 

2851 

2852 db_mock.get_one.return_value = { 

2853 "_admin": { 

2854 "storage": { 

2855 "folder": "/home/osm", 

2856 "pkg-dir": "vnfr_test_dir", 

2857 }, 

2858 }, 

2859 } 

2860 fs_mock.file_open.return_value = file_mock 

2861 file_mock.__enter__.return_value.read.return_value = cloud_init_content 

2862 

2863 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2864 

2865 self.assertEqual(cloud_init_content, result) 

2866 

2867 def test__get_cloud_init_vdu(self): 

2868 db_mock = MagicMock(name="database mock") 

2869 fs_mock = None 

2870 

2871 location = "vnfr_id_123456:vdu:0" 

2872 cloud_init_content = "this is a cloud init file content" 

2873 

2874 db_mock.get_one.return_value = { 

2875 "vdu": { 

2876 0: { 

2877 "cloud-init": cloud_init_content, 

2878 }, 

2879 }, 

2880 } 

2881 

2882 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2883 

2884 self.assertEqual(cloud_init_content, result) 

2885 

2886 @patch("jinja2.Environment.__init__") 

2887 def test__parse_jinja2_undefined_error(self, env_mock: Mock): 

2888 cloud_init_content = None 

2889 params = None 

2890 context = None 

2891 

2892 env_mock.side_effect = UndefinedError("UndefinedError occurred.") 

2893 

2894 with self.assertRaises(NsException): 

2895 Ns._parse_jinja2( 

2896 cloud_init_content=cloud_init_content, params=params, context=context 

2897 ) 

2898 

2899 @patch("jinja2.Environment.__init__") 

2900 def test__parse_jinja2_template_error(self, env_mock: Mock): 

2901 cloud_init_content = None 

2902 params = None 

2903 context = None 

2904 

2905 env_mock.side_effect = TemplateError("TemplateError occurred.") 

2906 

2907 with self.assertRaises(NsException): 

2908 Ns._parse_jinja2( 

2909 cloud_init_content=cloud_init_content, params=params, context=context 

2910 ) 

2911 

2912 @patch("jinja2.Environment.__init__") 

2913 def test__parse_jinja2_template_not_found(self, env_mock: Mock): 

2914 cloud_init_content = None 

2915 params = None 

2916 context = None 

2917 

2918 env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.") 

2919 

2920 with self.assertRaises(NsException): 

2921 Ns._parse_jinja2( 

2922 cloud_init_content=cloud_init_content, params=params, context=context 

2923 ) 

2924 

2925 def test_rendering_jinja2_temp_without_special_characters(self): 

2926 cloud_init_content = """ 

2927 disk_setup: 

2928 ephemeral0: 

2929 table_type: {{type}} 

2930 layout: True 

2931 overwrite: {{is_override}} 

2932 runcmd: 

2933 - [ ls, -l, / ] 

2934 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

2935 """ 

2936 params = { 

2937 "type": "mbr", 

2938 "is_override": "False", 

2939 "command": "; mkdir abc", 

2940 } 

2941 context = "cloud-init for VM" 

2942 expected_result = """ 

2943 disk_setup: 

2944 ephemeral0: 

2945 table_type: mbr 

2946 layout: True 

2947 overwrite: False 

2948 runcmd: 

2949 - [ ls, -l, / ] 

2950 - [ sh, -xc, "echo $(date) '; mkdir abc'" ] 

2951 """ 

2952 result = Ns._parse_jinja2( 

2953 cloud_init_content=cloud_init_content, params=params, context=context 

2954 ) 

2955 self.assertEqual(result, expected_result) 

2956 

2957 def test_rendering_jinja2_temp_with_special_characters(self): 

2958 cloud_init_content = """ 

2959 disk_setup: 

2960 ephemeral0: 

2961 table_type: {{type}} 

2962 layout: True 

2963 overwrite: {{is_override}} 

2964 runcmd: 

2965 - [ ls, -l, / ] 

2966 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

2967 """ 

2968 params = { 

2969 "type": "mbr", 

2970 "is_override": "False", 

2971 "command": "& rm -rf", 

2972 } 

2973 context = "cloud-init for VM" 

2974 expected_result = """ 

2975 disk_setup: 

2976 ephemeral0: 

2977 table_type: mbr 

2978 layout: True 

2979 overwrite: False 

2980 runcmd: 

2981 - [ ls, -l, / ] 

2982 - [ sh, -xc, "echo $(date) '& rm -rf /'" ] 

2983 """ 

2984 result = Ns._parse_jinja2( 

2985 cloud_init_content=cloud_init_content, params=params, context=context 

2986 ) 

2987 self.assertNotEqual(result, expected_result) 

2988 

2989 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self): 

2990 with patch("osm_ng_ro.ns.Environment") as mock_environment: 

2991 mock_environment.return_value = Environment( 

2992 undefined=StrictUndefined, 

2993 autoescape=select_autoescape(default_for_string=False, default=False), 

2994 ) 

2995 cloud_init_content = """ 

2996 disk_setup: 

2997 ephemeral0: 

2998 table_type: {{type}} 

2999 layout: True 

3000 overwrite: {{is_override}} 

3001 runcmd: 

3002 - [ ls, -l, / ] 

3003 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

3004 """ 

3005 params = { 

3006 "type": "mbr", 

3007 "is_override": "False", 

3008 "command": "& rm -rf /", 

3009 } 

3010 context = "cloud-init for VM" 

3011 expected_result = """ 

3012 disk_setup: 

3013 ephemeral0: 

3014 table_type: mbr 

3015 layout: True 

3016 overwrite: False 

3017 runcmd: 

3018 - [ ls, -l, / ] 

3019 - [ sh, -xc, "echo $(date) '& rm -rf /'" ] 

3020 """ 

3021 result = Ns._parse_jinja2( 

3022 cloud_init_content=cloud_init_content, 

3023 params=params, 

3024 context=context, 

3025 ) 

3026 self.assertEqual(result, expected_result) 

3027 

3028 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3029 def test__rebuild_start_stop_task__successful(self, assign_vim): 

3030 self.ns = Ns() 

3031 extra_dict = {} 

3032 actions = ["start", "stop", "rebuild"] 

3033 vdu_index = "0" 

3034 task_index = 0 

3035 for action in actions: 

3036 params = { 

3037 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3038 "action": action, 

3039 } 

3040 extra_dict["params"] = params 

3041 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3042 expected_result["target_record"] = ( 

3043 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3044 ) 

3045 expected_result["params"] = params 

3046 task = self.ns.rebuild_start_stop_task( 

3047 vdu_id, 

3048 vnf_id, 

3049 vdu_index, 

3050 action_id, 

3051 nsr_id_2, 

3052 task_index, 

3053 target_vim, 

3054 extra_dict, 

3055 ) 

3056 self.assertDictEqual(task, expected_result) 

3057 

3058 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3059 def test__rebuild_start_stop_task__empty_extra_dict__task_without_params( 

3060 self, assign_vim 

3061 ): 

3062 self.ns = Ns() 

3063 extra_dict = {} 

3064 actions = ["start", "stop", "rebuild"] 

3065 vdu_index = "0" 

3066 task_index = 0 

3067 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3068 expected_result["target_record"] = ( 

3069 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3070 ) 

3071 for _ in actions: 

3072 task = self.ns.rebuild_start_stop_task( 

3073 vdu_id, 

3074 vnf_id, 

3075 vdu_index, 

3076 action_id, 

3077 nsr_id_2, 

3078 task_index, 

3079 target_vim, 

3080 extra_dict, 

3081 ) 

3082 self.assertDictEqual(task, expected_result) 

3083 

3084 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3085 def test__rebuild_start_stop_task__different_vdu_index__target_record_changes( 

3086 self, assign_vim 

3087 ): 

3088 self.ns = Ns() 

3089 extra_dict = {} 

3090 actions = ["start", "stop", "rebuild"] 

3091 vdu_index = "4" 

3092 task_index = 0 

3093 for action in actions: 

3094 params = { 

3095 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3096 "action": action, 

3097 } 

3098 extra_dict["params"] = params 

3099 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3100 expected_result["target_record"] = ( 

3101 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.4.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3102 ) 

3103 expected_result["params"] = params 

3104 task = self.ns.rebuild_start_stop_task( 

3105 vdu_id, 

3106 vnf_id, 

3107 vdu_index, 

3108 action_id, 

3109 nsr_id_2, 

3110 task_index, 

3111 target_vim, 

3112 extra_dict, 

3113 ) 

3114 self.assertDictEqual(task, expected_result) 

3115 

3116 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3117 def test__rebuild_start_stop_task__different_task_index__task_id_changes( 

3118 self, assign_vim 

3119 ): 

3120 self.ns = Ns() 

3121 extra_dict = {} 

3122 actions = ["start", "stop", "rebuild"] 

3123 vdu_index = "0" 

3124 task_index = 3 

3125 for action in actions: 

3126 params = { 

3127 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3128 "action": action, 

3129 } 

3130 extra_dict["params"] = params 

3131 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3132 expected_result["target_record"] = ( 

3133 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3134 ) 

3135 expected_result["params"] = params 

3136 expected_result["task_id"] = "bb937f49-3870-4169-b758-9732e1ff40f3:3" 

3137 task = self.ns.rebuild_start_stop_task( 

3138 vdu_id, 

3139 vnf_id, 

3140 vdu_index, 

3141 action_id, 

3142 nsr_id_2, 

3143 task_index, 

3144 target_vim, 

3145 extra_dict, 

3146 ) 

3147 self.assertDictEqual(task, expected_result) 

3148 

3149 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3150 def test__rebuild_start_stop_task__assign_vim_raises__task_is_not_created( 

3151 self, assign_vim 

3152 ): 

3153 self.ns = Ns() 

3154 extra_dict = {} 

3155 actions = ["start", "stop", "rebuild"] 

3156 vdu_index = "0" 

3157 task_index = 0 

3158 for action in actions: 

3159 params = { 

3160 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3161 "action": action, 

3162 } 

3163 extra_dict["params"] = params 

3164 assign_vim.side_effect = TestException("Can not connect to VIM.") 

3165 with self.assertRaises(TestException) as err: 

3166 task = self.ns.rebuild_start_stop_task( 

3167 vdu_id, 

3168 vnf_id, 

3169 vdu_index, 

3170 action_id, 

3171 nsr_id_2, 

3172 task_index, 

3173 target_vim, 

3174 extra_dict, 

3175 ) 

3176 self.assertEqual(task, None) 

3177 self.assertEqual(str(err.exception), "Can not connect to VIM.") 

3178 

3179 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3180 def test_verticalscale_task__successful(self, assign_vim): 

3181 self.ns = Ns() 

3182 vdu_index = "1" 

3183 task_index = 1 

3184 task = self.ns.verticalscale_task( 

3185 vdu, 

3186 vnf, 

3187 vdu_index, 

3188 action_id, 

3189 nsr_id_2, 

3190 task_index, 

3191 extra_dict_vertical_scale, 

3192 ) 

3193 self.assertDictEqual(task, expected_result_vertical_scale) 

3194 

3195 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3196 def test_verticalscale_task__task_index_changes__task_id_changes(self, assign_vim): 

3197 self.ns = Ns() 

3198 vdu_index = "1" 

3199 task_index = 2 

3200 expected_result = deepcopy(expected_result_vertical_scale) 

3201 expected_result["task_id"] = "bb937f49-3870-4169-b758-9732e1ff40f3:2" 

3202 task = self.ns.verticalscale_task( 

3203 vdu, 

3204 vnf, 

3205 vdu_index, 

3206 action_id, 

3207 nsr_id_2, 

3208 task_index, 

3209 extra_dict_vertical_scale, 

3210 ) 

3211 self.assertDictEqual(task, expected_result) 

3212 

3213 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3214 def test_verticalscale_task__empty_extra_dict__expected_result_without_params( 

3215 self, assign_vim 

3216 ): 

3217 self.ns = Ns() 

3218 extra_dict = {"params": {}} 

3219 vdu_index = "1" 

3220 task_index = 1 

3221 expected_result = { 

3222 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35", 

3223 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

3224 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

3225 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1", 

3226 "status": "SCHEDULED", 

3227 "action": "EXEC", 

3228 "item": "verticalscale", 

3229 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35", 

3230 "target_record_id": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1", 

3231 "params": { 

3232 "flavor_id": "TASK-nsrs:993166fe-723e-4680-ac4b-b1af2541ae31:flavor.0" 

3233 }, 

3234 "depends_on": ["nsrs:993166fe-723e-4680-ac4b-b1af2541ae31:flavor.0"], 

3235 } 

3236 

3237 task = self.ns.verticalscale_task( 

3238 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict 

3239 ) 

3240 self.assertDictEqual(task, expected_result) 

3241 

3242 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3243 def test_verticalscale_task__assign_vim_raises__task_is_not_created( 

3244 self, assign_vim 

3245 ): 

3246 self.ns = Ns() 

3247 vdu_index = "1" 

3248 task_index = 1 

3249 assign_vim.side_effect = TestException("Can not connect to VIM.") 

3250 with self.assertRaises(TestException) as err: 

3251 task = self.ns.verticalscale_task( 

3252 vdu, 

3253 vnf, 

3254 vdu_index, 

3255 action_id, 

3256 nsr_id_2, 

3257 task_index, 

3258 extra_dict_vertical_scale, 

3259 ) 

3260 self.assertEqual(task, {}) 

3261 self.assertEqual(str(err.exception), "Can not connect to VIM.") 

3262 

3263 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3264 def test_migrate_task__successful(self, assign_vim): 

3265 self.ns = Ns() 

3266 vdu_index = "1" 

3267 task_index = 1 

3268 task = self.ns.migrate_task( 

3269 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict_migrate 

3270 ) 

3271 self.assertDictEqual(task, expected_result_migrate) 

3272 

3273 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3274 def test_migrate_task__empty_extra_dict__task_without_params(self, assign_vim): 

3275 self.ns = Ns() 

3276 extra_dict = {} 

3277 vdu_index = "1" 

3278 task_index = 1 

3279 expected_result = deepcopy(expected_result_migrate) 

3280 expected_result.pop("params") 

3281 task = self.ns.migrate_task( 

3282 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict 

3283 ) 

3284 self.assertDictEqual(task, expected_result) 

3285 

3286 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3287 def test_migrate_task__different_vdu_index__target_record_with_different_vdu_index( 

3288 self, assign_vim 

3289 ): 

3290 self.ns = Ns() 

3291 vdu_index = "4" 

3292 task_index = 1 

3293 expected_result = deepcopy(expected_result_migrate) 

3294 expected_result["target_record"] = ( 

3295 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.4.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3296 ) 

3297 task = self.ns.migrate_task( 

3298 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict_migrate 

3299 ) 

3300 self.assertDictEqual(task, expected_result) 

3301 

3302 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3303 def test_migrate_task__assign_vim_raises__task_is_not_created(self, assign_vim): 

3304 self.ns = Ns() 

3305 vdu_index = "1" 

3306 task_index = 1 

3307 assign_vim.side_effect = TestException("Can not connect to VIM.") 

3308 with self.assertRaises(TestException) as err: 

3309 task = self.ns.migrate_task( 

3310 vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict_migrate 

3311 ) 

3312 self.assertDictEqual(task, {}) 

3313 self.assertEqual(str(err.exception), "Can not connect to VIM.") 

3314 

3315 

3316class TestProcessVduParams(unittest.TestCase): 

3317 def setUp(self): 

3318 self.ns = Ns() 

3319 self.logger = CopyingMock(autospec=True) 

3320 

3321 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3322 def test_find_persistent_root_volumes_empty_instantiation_vol_list( 

3323 self, mock_volume_keeping_required 

3324 ): 

3325 """Find persistent root volume, instantiation_vol_list is empty.""" 

3326 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3327 target_vdu = target_vdu_wth_persistent_storage 

3328 vdu_instantiation_volumes_list = [] 

3329 disk_list = [] 

3330 mock_volume_keeping_required.return_value = True 

3331 expected_root_disk = { 

3332 "id": "persistent-root-volume", 

3333 "type-of-storage": "persistent-storage:persistent-storage", 

3334 "size-of-storage": "10", 

3335 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}], 

3336 } 

3337 expected_persist_root_disk = { 

3338 "persistent-root-volume": { 

3339 "image_id": "ubuntu20.04", 

3340 "size": "10", 

3341 "keep": True, 

3342 } 

3343 } 

3344 expected_disk_list = [ 

3345 { 

3346 "image_id": "ubuntu20.04", 

3347 "size": "10", 

3348 "keep": True, 

3349 }, 

3350 ] 

3351 persist_root_disk = self.ns.find_persistent_root_volumes( 

3352 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3353 ) 

3354 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3355 mock_volume_keeping_required.assert_called_once_with(expected_root_disk) 

3356 self.assertEqual(disk_list, expected_disk_list) 

3357 self.assertEqual(len(disk_list), 1) 

3358 

3359 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3360 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root( 

3361 self, mock_volume_keeping_required 

3362 ): 

3363 """Find persistent root volume, always selects the first vsd as root volume.""" 

3364 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3365 vnfd["vdu"][0]["virtual-storage-desc"] = [ 

3366 "persistent-volume2", 

3367 "persistent-root-volume", 

3368 "ephemeral-volume", 

3369 ] 

3370 target_vdu = target_vdu_wth_persistent_storage 

3371 vdu_instantiation_volumes_list = [] 

3372 disk_list = [] 

3373 mock_volume_keeping_required.return_value = True 

3374 expected_root_disk = { 

3375 "id": "persistent-volume2", 

3376 "type-of-storage": "persistent-storage:persistent-storage", 

3377 "size-of-storage": "10", 

3378 } 

3379 expected_persist_root_disk = { 

3380 "persistent-volume2": { 

3381 "image_id": "ubuntu20.04", 

3382 "size": "10", 

3383 "keep": True, 

3384 } 

3385 } 

3386 expected_disk_list = [ 

3387 { 

3388 "image_id": "ubuntu20.04", 

3389 "size": "10", 

3390 "keep": True, 

3391 }, 

3392 ] 

3393 persist_root_disk = self.ns.find_persistent_root_volumes( 

3394 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3395 ) 

3396 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3397 mock_volume_keeping_required.assert_called_once_with(expected_root_disk) 

3398 self.assertEqual(disk_list, expected_disk_list) 

3399 self.assertEqual(len(disk_list), 1) 

3400 

3401 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3402 def test_find_persistent_root_volumes_empty_size_of_storage( 

3403 self, mock_volume_keeping_required 

3404 ): 

3405 """Find persistent root volume, size of storage is empty.""" 

3406 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3407 vnfd["virtual-storage-desc"][0]["size-of-storage"] = "" 

3408 vnfd["vdu"][0]["virtual-storage-desc"] = [ 

3409 "persistent-volume2", 

3410 "persistent-root-volume", 

3411 "ephemeral-volume", 

3412 ] 

3413 target_vdu = target_vdu_wth_persistent_storage 

3414 vdu_instantiation_volumes_list = [] 

3415 disk_list = [] 

3416 persist_root_disk = self.ns.find_persistent_root_volumes( 

3417 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3418 ) 

3419 self.assertEqual(persist_root_disk, {}) 

3420 mock_volume_keeping_required.assert_not_called() 

3421 self.assertEqual(disk_list, []) 

3422 

3423 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3424 def test_find_persistent_root_volumes_keeping_is_not_required( 

3425 self, mock_volume_keeping_required 

3426 ): 

3427 """Find persistent root volume, volume keeping is not required.""" 

3428 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3429 vnfd["virtual-storage-desc"][1]["vdu-storage-requirements"] = [ 

3430 {"key": "keep-volume", "value": "false"}, 

3431 ] 

3432 target_vdu = target_vdu_wth_persistent_storage 

3433 vdu_instantiation_volumes_list = [] 

3434 disk_list = [] 

3435 mock_volume_keeping_required.return_value = False 

3436 expected_root_disk = { 

3437 "id": "persistent-root-volume", 

3438 "type-of-storage": "persistent-storage:persistent-storage", 

3439 "size-of-storage": "10", 

3440 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}], 

3441 } 

3442 expected_persist_root_disk = { 

3443 "persistent-root-volume": { 

3444 "image_id": "ubuntu20.04", 

3445 "size": "10", 

3446 "keep": False, 

3447 } 

3448 } 

3449 expected_disk_list = [ 

3450 { 

3451 "image_id": "ubuntu20.04", 

3452 "size": "10", 

3453 "keep": False, 

3454 }, 

3455 ] 

3456 persist_root_disk = self.ns.find_persistent_root_volumes( 

3457 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3458 ) 

3459 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3460 mock_volume_keeping_required.assert_called_once_with(expected_root_disk) 

3461 self.assertEqual(disk_list, expected_disk_list) 

3462 self.assertEqual(len(disk_list), 1) 

3463 

3464 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3465 def test_find_persistent_root_volumes_target_vdu_mismatch( 

3466 self, mock_volume_keeping_required 

3467 ): 

3468 """Find persistent root volume, target vdu name is not matching.""" 

3469 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3470 vnfd["vdu"][0]["name"] = "Several_Volumes-VM" 

3471 target_vdu = target_vdu_wth_persistent_storage 

3472 vdu_instantiation_volumes_list = [] 

3473 disk_list = [] 

3474 result = self.ns.find_persistent_root_volumes( 

3475 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3476 ) 

3477 self.assertEqual(result, None) 

3478 mock_volume_keeping_required.assert_not_called() 

3479 self.assertEqual(disk_list, []) 

3480 self.assertEqual(len(disk_list), 0) 

3481 

3482 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3483 def test_find_persistent_root_volumes_with_instantiation_vol_list( 

3484 self, mock_volume_keeping_required 

3485 ): 

3486 """Find persistent root volume, existing volume needs to be used.""" 

3487 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3488 target_vdu = target_vdu_wth_persistent_storage 

3489 vdu_instantiation_volumes_list = [ 

3490 { 

3491 "vim-volume-id": vim_volume_id, 

3492 "name": "persistent-root-volume", 

3493 } 

3494 ] 

3495 disk_list = [] 

3496 expected_persist_root_disk = { 

3497 "persistent-root-volume": { 

3498 "vim_volume_id": vim_volume_id, 

3499 "image_id": "ubuntu20.04", 

3500 }, 

3501 } 

3502 expected_disk_list = [ 

3503 { 

3504 "vim_volume_id": vim_volume_id, 

3505 "image_id": "ubuntu20.04", 

3506 }, 

3507 ] 

3508 persist_root_disk = self.ns.find_persistent_root_volumes( 

3509 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3510 ) 

3511 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3512 mock_volume_keeping_required.assert_not_called() 

3513 self.assertEqual(disk_list, expected_disk_list) 

3514 self.assertEqual(len(disk_list), 1) 

3515 

3516 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3517 def test_find_persistent_root_volumes_invalid_instantiation_params( 

3518 self, mock_volume_keeping_required 

3519 ): 

3520 """Find persistent root volume, existing volume id keyword is invalid.""" 

3521 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3522 target_vdu = target_vdu_wth_persistent_storage 

3523 vdu_instantiation_volumes_list = [ 

3524 { 

3525 "volume-id": vim_volume_id, 

3526 "name": "persistent-root-volume", 

3527 } 

3528 ] 

3529 disk_list = [] 

3530 with self.assertRaises(KeyError): 

3531 self.ns.find_persistent_root_volumes( 

3532 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3533 ) 

3534 mock_volume_keeping_required.assert_not_called() 

3535 self.assertEqual(disk_list, []) 

3536 self.assertEqual(len(disk_list), 0) 

3537 

3538 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3539 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list( 

3540 self, mock_volume_keeping_required 

3541 ): 

3542 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty.""" 

3543 persistent_root_disk = { 

3544 "persistent-root-volume": { 

3545 "image_id": "ubuntu20.04", 

3546 "size": "10", 

3547 "keep": False, 

3548 } 

3549 } 

3550 mock_volume_keeping_required.return_value = False 

3551 target_vdu = target_vdu_wth_persistent_storage 

3552 vdu_instantiation_volumes_list = [] 

3553 disk_list = [ 

3554 { 

3555 "image_id": "ubuntu20.04", 

3556 "size": "10", 

3557 "keep": False, 

3558 }, 

3559 ] 

3560 expected_disk = { 

3561 "id": "persistent-volume2", 

3562 "size-of-storage": "10", 

3563 "type-of-storage": "persistent-storage:persistent-storage", 

3564 } 

3565 expected_disk_list = [ 

3566 { 

3567 "image_id": "ubuntu20.04", 

3568 "size": "10", 

3569 "keep": False, 

3570 }, 

3571 { 

3572 "size": "10", 

3573 "keep": False, 

3574 }, 

3575 ] 

3576 self.ns.find_persistent_volumes( 

3577 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3578 ) 

3579 self.assertEqual(disk_list, expected_disk_list) 

3580 mock_volume_keeping_required.assert_called_once_with(expected_disk) 

3581 

3582 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3583 def test_find_persistent_volumes_vdu_wth_inst_vol_list( 

3584 self, mock_volume_keeping_required 

3585 ): 

3586 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter.""" 

3587 persistent_root_disk = { 

3588 "persistent-root-volume": { 

3589 "image_id": "ubuntu20.04", 

3590 "size": "10", 

3591 "keep": False, 

3592 } 

3593 } 

3594 vdu_instantiation_volumes_list = [ 

3595 { 

3596 "vim-volume-id": vim_volume_id, 

3597 "name": "persistent-volume2", 

3598 } 

3599 ] 

3600 target_vdu = target_vdu_wth_persistent_storage 

3601 disk_list = [ 

3602 { 

3603 "image_id": "ubuntu20.04", 

3604 "size": "10", 

3605 "keep": False, 

3606 }, 

3607 ] 

3608 expected_disk_list = [ 

3609 { 

3610 "image_id": "ubuntu20.04", 

3611 "size": "10", 

3612 "keep": False, 

3613 }, 

3614 { 

3615 "vim_volume_id": vim_volume_id, 

3616 }, 

3617 ] 

3618 self.ns.find_persistent_volumes( 

3619 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3620 ) 

3621 self.assertEqual(disk_list, expected_disk_list) 

3622 mock_volume_keeping_required.assert_not_called() 

3623 

3624 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3625 def test_find_persistent_volumes_vdu_wthout_persistent_storage( 

3626 self, mock_volume_keeping_required 

3627 ): 

3628 """Find persistent ordinary volume, there is not any persistent disk.""" 

3629 persistent_root_disk = {} 

3630 vdu_instantiation_volumes_list = [] 

3631 mock_volume_keeping_required.return_value = False 

3632 target_vdu = target_vdu_wthout_persistent_storage 

3633 disk_list = [] 

3634 self.ns.find_persistent_volumes( 

3635 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3636 ) 

3637 self.assertEqual(disk_list, disk_list) 

3638 mock_volume_keeping_required.assert_not_called() 

3639 

3640 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3641 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk( 

3642 self, mock_volume_keeping_required 

3643 ): 

3644 """There is persistent root disk, but there is not ordinary persistent disk.""" 

3645 persistent_root_disk = { 

3646 "persistent-root-volume": { 

3647 "image_id": "ubuntu20.04", 

3648 "size": "10", 

3649 "keep": False, 

3650 } 

3651 } 

3652 vdu_instantiation_volumes_list = [] 

3653 mock_volume_keeping_required.return_value = False 

3654 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3655 target_vdu["virtual-storages"] = [ 

3656 { 

3657 "id": "persistent-root-volume", 

3658 "size-of-storage": "10", 

3659 "type-of-storage": "persistent-storage:persistent-storage", 

3660 "vdu-storage-requirements": [ 

3661 {"key": "keep-volume", "value": "true"}, 

3662 ], 

3663 }, 

3664 { 

3665 "id": "ephemeral-volume", 

3666 "size-of-storage": "1", 

3667 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

3668 }, 

3669 ] 

3670 disk_list = [ 

3671 { 

3672 "image_id": "ubuntu20.04", 

3673 "size": "10", 

3674 "keep": False, 

3675 }, 

3676 ] 

3677 self.ns.find_persistent_volumes( 

3678 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3679 ) 

3680 self.assertEqual(disk_list, disk_list) 

3681 mock_volume_keeping_required.assert_not_called() 

3682 

3683 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3684 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch( 

3685 self, mock_volume_keeping_required 

3686 ): 

3687 """Find persistent ordinary volume, volume id is not persistent_root_disk dict, 

3688 vim-volume-id is given as instantiation parameter but disk id is not matching. 

3689 """ 

3690 mock_volume_keeping_required.return_value = True 

3691 vdu_instantiation_volumes_list = [ 

3692 { 

3693 "vim-volume-id": vim_volume_id, 

3694 "name": "persistent-volume3", 

3695 } 

3696 ] 

3697 persistent_root_disk = { 

3698 "persistent-root-volume": { 

3699 "image_id": "ubuntu20.04", 

3700 "size": "10", 

3701 "keep": False, 

3702 } 

3703 } 

3704 disk_list = [ 

3705 { 

3706 "image_id": "ubuntu20.04", 

3707 "size": "10", 

3708 "keep": False, 

3709 }, 

3710 ] 

3711 expected_disk_list = [ 

3712 { 

3713 "image_id": "ubuntu20.04", 

3714 "size": "10", 

3715 "keep": False, 

3716 }, 

3717 { 

3718 "size": "10", 

3719 "keep": True, 

3720 }, 

3721 ] 

3722 expected_disk = { 

3723 "id": "persistent-volume2", 

3724 "size-of-storage": "10", 

3725 "type-of-storage": "persistent-storage:persistent-storage", 

3726 } 

3727 target_vdu = target_vdu_wth_persistent_storage 

3728 self.ns.find_persistent_volumes( 

3729 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3730 ) 

3731 self.assertEqual(disk_list, expected_disk_list) 

3732 mock_volume_keeping_required.assert_called_once_with(expected_disk) 

3733 

3734 def test_is_volume_keeping_required_true(self): 

3735 """Volume keeping is required.""" 

3736 virtual_storage_descriptor = { 

3737 "id": "persistent-root-volume", 

3738 "type-of-storage": "persistent-storage:persistent-storage", 

3739 "size-of-storage": "10", 

3740 "vdu-storage-requirements": [ 

3741 {"key": "keep-volume", "value": "true"}, 

3742 ], 

3743 } 

3744 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3745 self.assertEqual(result, True) 

3746 

3747 def test_is_volume_keeping_required_false(self): 

3748 """Volume keeping is not required.""" 

3749 virtual_storage_descriptor = { 

3750 "id": "persistent-root-volume", 

3751 "type-of-storage": "persistent-storage:persistent-storage", 

3752 "size-of-storage": "10", 

3753 "vdu-storage-requirements": [ 

3754 {"key": "keep-volume", "value": "false"}, 

3755 ], 

3756 } 

3757 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3758 self.assertEqual(result, False) 

3759 

3760 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self): 

3761 """Volume keeping is not required, vdu-storage-requirements key does not exist.""" 

3762 virtual_storage_descriptor = { 

3763 "id": "persistent-root-volume", 

3764 "type-of-storage": "persistent-storage:persistent-storage", 

3765 "size-of-storage": "10", 

3766 } 

3767 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3768 self.assertEqual(result, False) 

3769 

3770 def test_is_volume_keeping_required_wrong_keyword(self): 

3771 """vdu-storage-requirements key to indicate keeping-volume is wrong.""" 

3772 virtual_storage_descriptor = { 

3773 "id": "persistent-root-volume", 

3774 "type-of-storage": "persistent-storage:persistent-storage", 

3775 "size-of-storage": "10", 

3776 "vdu-storage-requirements": [ 

3777 {"key": "hold-volume", "value": "true"}, 

3778 ], 

3779 } 

3780 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3781 self.assertEqual(result, False) 

3782 

3783 def test_sort_vdu_interfaces_position_all_wth_positions(self): 

3784 """Interfaces are sorted according to position, all have positions.""" 

3785 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3786 target_vdu["interfaces"] = [ 

3787 { 

3788 "name": "vdu-eth1", 

3789 "ns-vld-id": "datanet", 

3790 "position": 2, 

3791 }, 

3792 { 

3793 "name": "vdu-eth0", 

3794 "ns-vld-id": "mgmtnet", 

3795 "position": 1, 

3796 }, 

3797 ] 

3798 sorted_interfaces = [ 

3799 { 

3800 "name": "vdu-eth0", 

3801 "ns-vld-id": "mgmtnet", 

3802 "position": 1, 

3803 }, 

3804 { 

3805 "name": "vdu-eth1", 

3806 "ns-vld-id": "datanet", 

3807 "position": 2, 

3808 }, 

3809 ] 

3810 self.ns._sort_vdu_interfaces(target_vdu) 

3811 self.assertEqual(target_vdu["interfaces"], sorted_interfaces) 

3812 

3813 def test_sort_vdu_interfaces_position_some_wth_position(self): 

3814 """Interfaces are sorted according to position, some of them have positions.""" 

3815 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3816 target_vdu["interfaces"] = [ 

3817 { 

3818 "name": "vdu-eth0", 

3819 "ns-vld-id": "mgmtnet", 

3820 }, 

3821 { 

3822 "name": "vdu-eth1", 

3823 "ns-vld-id": "datanet", 

3824 "position": 1, 

3825 }, 

3826 ] 

3827 sorted_interfaces = [ 

3828 { 

3829 "name": "vdu-eth1", 

3830 "ns-vld-id": "datanet", 

3831 "position": 1, 

3832 }, 

3833 { 

3834 "name": "vdu-eth0", 

3835 "ns-vld-id": "mgmtnet", 

3836 }, 

3837 ] 

3838 self.ns._sort_vdu_interfaces(target_vdu) 

3839 self.assertEqual(target_vdu["interfaces"], sorted_interfaces) 

3840 

3841 def test_sort_vdu_interfaces_position_empty_interface_list(self): 

3842 """Interface list is empty.""" 

3843 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3844 target_vdu["interfaces"] = [] 

3845 sorted_interfaces = [] 

3846 self.ns._sort_vdu_interfaces(target_vdu) 

3847 self.assertEqual(target_vdu["interfaces"], sorted_interfaces) 

3848 

3849 def test_partially_locate_vdu_interfaces(self): 

3850 """Some interfaces have positions.""" 

3851 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3852 target_vdu["interfaces"] = [ 

3853 { 

3854 "name": "vdu-eth1", 

3855 "ns-vld-id": "net1", 

3856 }, 

3857 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3858 { 

3859 "name": "vdu-eth3", 

3860 "ns-vld-id": "mgmtnet", 

3861 }, 

3862 { 

3863 "name": "vdu-eth1", 

3864 "ns-vld-id": "datanet", 

3865 "position": 1, 

3866 }, 

3867 ] 

3868 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3869 self.assertDictEqual( 

3870 target_vdu["interfaces"][0], 

3871 { 

3872 "name": "vdu-eth1", 

3873 "ns-vld-id": "datanet", 

3874 "position": 1, 

3875 }, 

3876 ) 

3877 self.assertDictEqual( 

3878 target_vdu["interfaces"][2], 

3879 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3880 ) 

3881 

3882 def test_partially_locate_vdu_interfaces_position_start_from_0(self): 

3883 """Some interfaces have positions, position start from 0.""" 

3884 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3885 target_vdu["interfaces"] = [ 

3886 { 

3887 "name": "vdu-eth1", 

3888 "ns-vld-id": "net1", 

3889 }, 

3890 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3891 { 

3892 "name": "vdu-eth3", 

3893 "ns-vld-id": "mgmtnet", 

3894 }, 

3895 { 

3896 "name": "vdu-eth1", 

3897 "ns-vld-id": "datanet", 

3898 "position": 0, 

3899 }, 

3900 ] 

3901 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3902 self.assertDictEqual( 

3903 target_vdu["interfaces"][0], 

3904 { 

3905 "name": "vdu-eth1", 

3906 "ns-vld-id": "datanet", 

3907 "position": 0, 

3908 }, 

3909 ) 

3910 self.assertDictEqual( 

3911 target_vdu["interfaces"][3], 

3912 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3913 ) 

3914 

3915 def test_partially_locate_vdu_interfaces_wthout_position(self): 

3916 """Interfaces do not have positions.""" 

3917 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3918 target_vdu["interfaces"] = interfaces_wthout_positions 

3919 expected_result = deepcopy(target_vdu["interfaces"]) 

3920 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3921 self.assertEqual(target_vdu["interfaces"], expected_result) 

3922 

3923 def test_partially_locate_vdu_interfaces_all_has_position(self): 

3924 """All interfaces have position.""" 

3925 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3926 target_vdu["interfaces"] = interfaces_wth_all_positions 

3927 expected_interfaces = [ 

3928 { 

3929 "name": "vdu-eth2", 

3930 "ns-vld-id": "net2", 

3931 "position": 0, 

3932 }, 

3933 { 

3934 "name": "vdu-eth3", 

3935 "ns-vld-id": "mgmtnet", 

3936 "position": 1, 

3937 }, 

3938 { 

3939 "name": "vdu-eth1", 

3940 "ns-vld-id": "net1", 

3941 "position": 2, 

3942 }, 

3943 ] 

3944 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3945 self.assertEqual(target_vdu["interfaces"], expected_interfaces) 

3946 

3947 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

3948 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

3949 def test_prepare_vdu_cloud_init(self, mock_parse_jinja2, mock_get_cloud_init): 

3950 """Target_vdu has cloud-init and boot-data-drive.""" 

3951 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3952 target_vdu["cloud-init"] = "sample-cloud-init-path" 

3953 target_vdu["boot-data-drive"] = "vda" 

3954 vdu2cloud_init = {} 

3955 mock_get_cloud_init.return_value = cloud_init_content 

3956 mock_parse_jinja2.return_value = user_data 

3957 expected_result = { 

3958 "user-data": user_data, 

3959 "boot-data-drive": "vda", 

3960 } 

3961 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

3962 self.assertDictEqual(result, expected_result) 

3963 mock_get_cloud_init.assert_called_once_with( 

3964 db=db, fs=fs, location="sample-cloud-init-path" 

3965 ) 

3966 mock_parse_jinja2.assert_called_once_with( 

3967 cloud_init_content=cloud_init_content, 

3968 params=None, 

3969 context="sample-cloud-init-path", 

3970 ) 

3971 

3972 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

3973 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

3974 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception( 

3975 self, mock_parse_jinja2, mock_get_cloud_init 

3976 ): 

3977 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception.""" 

3978 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3979 target_vdu["cloud-init"] = "sample-cloud-init-path" 

3980 target_vdu["boot-data-drive"] = "vda" 

3981 vdu2cloud_init = {} 

3982 mock_get_cloud_init.side_effect = NsException( 

3983 "Mismatch descriptor for cloud init." 

3984 ) 

3985 

3986 with self.assertRaises(NsException) as err: 

3987 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

3988 self.assertEqual(str(err.exception), "Mismatch descriptor for cloud init.") 

3989 

3990 mock_get_cloud_init.assert_called_once_with( 

3991 db=db, fs=fs, location="sample-cloud-init-path" 

3992 ) 

3993 mock_parse_jinja2.assert_not_called() 

3994 

3995 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

3996 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

3997 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception( 

3998 self, mock_parse_jinja2, mock_get_cloud_init 

3999 ): 

4000 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception.""" 

4001 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4002 target_vdu["cloud-init"] = "sample-cloud-init-path" 

4003 target_vdu["boot-data-drive"] = "vda" 

4004 vdu2cloud_init = {} 

4005 mock_get_cloud_init.return_value = cloud_init_content 

4006 mock_parse_jinja2.side_effect = NsException("Error parsing cloud-init content.") 

4007 

4008 with self.assertRaises(NsException) as err: 

4009 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4010 self.assertEqual(str(err.exception), "Error parsing cloud-init content.") 

4011 mock_get_cloud_init.assert_called_once_with( 

4012 db=db, fs=fs, location="sample-cloud-init-path" 

4013 ) 

4014 mock_parse_jinja2.assert_called_once_with( 

4015 cloud_init_content=cloud_init_content, 

4016 params=None, 

4017 context="sample-cloud-init-path", 

4018 ) 

4019 

4020 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

4021 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

4022 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive( 

4023 self, mock_parse_jinja2, mock_get_cloud_init 

4024 ): 

4025 """Target_vdu has cloud-init but do not have boot-data-drive.""" 

4026 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4027 target_vdu["cloud-init"] = "sample-cloud-init-path" 

4028 vdu2cloud_init = {} 

4029 mock_get_cloud_init.return_value = cloud_init_content 

4030 mock_parse_jinja2.return_value = user_data 

4031 expected_result = { 

4032 "user-data": user_data, 

4033 } 

4034 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4035 self.assertDictEqual(result, expected_result) 

4036 mock_get_cloud_init.assert_called_once_with( 

4037 db=db, fs=fs, location="sample-cloud-init-path" 

4038 ) 

4039 mock_parse_jinja2.assert_called_once_with( 

4040 cloud_init_content=cloud_init_content, 

4041 params=None, 

4042 context="sample-cloud-init-path", 

4043 ) 

4044 

4045 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

4046 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

4047 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init( 

4048 self, mock_parse_jinja2, mock_get_cloud_init 

4049 ): 

4050 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content.""" 

4051 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4052 target_vdu["cloud-init"] = "sample-cloud-init-path" 

4053 target_vdu["boot-data-drive"] = "vda" 

4054 vdu2cloud_init = {"sample-cloud-init-path": cloud_init_content} 

4055 mock_parse_jinja2.return_value = user_data 

4056 expected_result = { 

4057 "user-data": user_data, 

4058 "boot-data-drive": "vda", 

4059 } 

4060 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4061 self.assertDictEqual(result, expected_result) 

4062 mock_get_cloud_init.assert_not_called() 

4063 mock_parse_jinja2.assert_called_once_with( 

4064 cloud_init_content=cloud_init_content, 

4065 params=None, 

4066 context="sample-cloud-init-path", 

4067 ) 

4068 

4069 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

4070 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

4071 def test_prepare_vdu_cloud_init_no_cloud_init( 

4072 self, mock_parse_jinja2, mock_get_cloud_init 

4073 ): 

4074 """Target_vdu do not have cloud-init.""" 

4075 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4076 target_vdu["boot-data-drive"] = "vda" 

4077 vdu2cloud_init = {} 

4078 expected_result = { 

4079 "boot-data-drive": "vda", 

4080 } 

4081 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4082 self.assertDictEqual(result, expected_result) 

4083 mock_get_cloud_init.assert_not_called() 

4084 mock_parse_jinja2.assert_not_called() 

4085 

4086 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self): 

4087 """ns_vld and vnf_vld both exist.""" 

4088 interface = { 

4089 "name": "vdu-eth0", 

4090 "ns-vld-id": "mgmtnet", 

4091 "vnf-vld-id": "mgmt_cp_int", 

4092 } 

4093 expected_result = f"{ns_preffix}:vld.mgmtnet" 

4094 result = self.ns._check_vld_information_of_interfaces( 

4095 interface, ns_preffix, vnf_preffix 

4096 ) 

4097 self.assertEqual(result, expected_result) 

4098 

4099 def test_check_vld_information_of_interfaces_empty_interfaces(self): 

4100 """Interface dict is empty.""" 

4101 interface = {} 

4102 result = self.ns._check_vld_information_of_interfaces( 

4103 interface, ns_preffix, vnf_preffix 

4104 ) 

4105 self.assertEqual(result, "") 

4106 

4107 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self): 

4108 """Interface dict has only vnf_vld.""" 

4109 interface = { 

4110 "name": "vdu-eth0", 

4111 "vnf-vld-id": "mgmt_cp_int", 

4112 } 

4113 expected_result = f"{vnf_preffix}:vld.mgmt_cp_int" 

4114 result = self.ns._check_vld_information_of_interfaces( 

4115 interface, ns_preffix, vnf_preffix 

4116 ) 

4117 self.assertEqual(result, expected_result) 

4118 

4119 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix( 

4120 self, 

4121 ): 

4122 """Interface dict has only vnf_vld but vnf_preffix does not exist.""" 

4123 interface = { 

4124 "name": "vdu-eth0", 

4125 "vnf-vld-id": "mgmt_cp_int", 

4126 } 

4127 vnf_preffix = None 

4128 with self.assertRaises(Exception) as err: 

4129 self.ns._check_vld_information_of_interfaces( 

4130 interface, ns_preffix, vnf_preffix 

4131 ) 

4132 self.assertEqual(type(err), TypeError) 

4133 

4134 def test_prepare_interface_port_security_has_security_details(self): 

4135 """Interface dict has port security details.""" 

4136 interface = { 

4137 "name": "vdu-eth0", 

4138 "ns-vld-id": "mgmtnet", 

4139 "vnf-vld-id": "mgmt_cp_int", 

4140 "port-security-enabled": True, 

4141 "port-security-disable-strategy": "allow-address-pairs", 

4142 } 

4143 expected_interface = { 

4144 "name": "vdu-eth0", 

4145 "ns-vld-id": "mgmtnet", 

4146 "vnf-vld-id": "mgmt_cp_int", 

4147 "port_security": True, 

4148 "port_security_disable_strategy": "allow-address-pairs", 

4149 } 

4150 self.ns._prepare_interface_port_security(interface) 

4151 self.assertDictEqual(interface, expected_interface) 

4152 

4153 def test_prepare_interface_port_security_empty_interfaces(self): 

4154 """Interface dict is empty.""" 

4155 interface = {} 

4156 expected_interface = {} 

4157 self.ns._prepare_interface_port_security(interface) 

4158 self.assertDictEqual(interface, expected_interface) 

4159 

4160 def test_prepare_interface_port_security_wthout_port_security(self): 

4161 """Interface dict does not have port security details.""" 

4162 interface = { 

4163 "name": "vdu-eth0", 

4164 "ns-vld-id": "mgmtnet", 

4165 "vnf-vld-id": "mgmt_cp_int", 

4166 } 

4167 expected_interface = { 

4168 "name": "vdu-eth0", 

4169 "ns-vld-id": "mgmtnet", 

4170 "vnf-vld-id": "mgmt_cp_int", 

4171 } 

4172 self.ns._prepare_interface_port_security(interface) 

4173 self.assertDictEqual(interface, expected_interface) 

4174 

4175 def test_create_net_item_of_interface_floating_ip_port_security(self): 

4176 """Interface dict has floating ip, port-security details.""" 

4177 interface = { 

4178 "name": "vdu-eth0", 

4179 "vcpi": "sample_vcpi", 

4180 "port_security": True, 

4181 "port_security_disable_strategy": "allow-address-pairs", 

4182 "floating_ip": "10.1.1.12", 

4183 "ns-vld-id": "mgmtnet", 

4184 "vnf-vld-id": "mgmt_cp_int", 

4185 } 

4186 net_text = f"{ns_preffix}" 

4187 expected_net_item = { 

4188 "name": "vdu-eth0", 

4189 "port_security": True, 

4190 "port_security_disable_strategy": "allow-address-pairs", 

4191 "floating_ip": "10.1.1.12", 

4192 "net_id": f"TASK-{ns_preffix}", 

4193 "type": "virtual", 

4194 } 

4195 result = self.ns._create_net_item_of_interface(interface, net_text) 

4196 self.assertDictEqual(result, expected_net_item) 

4197 

4198 def test_create_net_item_of_interface_invalid_net_text(self): 

4199 """net-text is invalid.""" 

4200 interface = { 

4201 "name": "vdu-eth0", 

4202 "vcpi": "sample_vcpi", 

4203 "port_security": True, 

4204 "port_security_disable_strategy": "allow-address-pairs", 

4205 "floating_ip": "10.1.1.12", 

4206 "ns-vld-id": "mgmtnet", 

4207 "vnf-vld-id": "mgmt_cp_int", 

4208 } 

4209 net_text = None 

4210 with self.assertRaises(TypeError): 

4211 self.ns._create_net_item_of_interface(interface, net_text) 

4212 

4213 def test_create_net_item_of_interface_empty_interface(self): 

4214 """Interface dict is empty.""" 

4215 interface = {} 

4216 net_text = ns_preffix 

4217 expected_net_item = { 

4218 "net_id": f"TASK-{ns_preffix}", 

4219 "type": "virtual", 

4220 } 

4221 result = self.ns._create_net_item_of_interface(interface, net_text) 

4222 self.assertDictEqual(result, expected_net_item) 

4223 

4224 @patch("osm_ng_ro.ns.deep_get") 

4225 def test_prepare_type_of_interface_type_sriov(self, mock_deep_get): 

4226 """Interface type is SR-IOV.""" 

4227 interface = { 

4228 "name": "vdu-eth0", 

4229 "vcpi": "sample_vcpi", 

4230 "port_security": True, 

4231 "port_security_disable_strategy": "allow-address-pairs", 

4232 "floating_ip": "10.1.1.12", 

4233 "ns-vld-id": "mgmtnet", 

4234 "vnf-vld-id": "mgmt_cp_int", 

4235 "type": "SR-IOV", 

4236 } 

4237 mock_deep_get.return_value = "SR-IOV" 

4238 net_text = ns_preffix 

4239 net_item = {} 

4240 expected_net_item = { 

4241 "use": "data", 

4242 "model": "SR-IOV", 

4243 "type": "SR-IOV", 

4244 } 

4245 self.ns._prepare_type_of_interface( 

4246 interface, tasks_by_target_record_id, net_text, net_item 

4247 ) 

4248 self.assertDictEqual(net_item, expected_net_item) 

4249 self.assertEqual( 

4250 "data", 

4251 tasks_by_target_record_id[net_text]["extra_dict"]["params"]["net_type"], 

4252 ) 

4253 mock_deep_get.assert_called_once_with( 

4254 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type" 

4255 ) 

4256 

4257 @patch("osm_ng_ro.ns.deep_get") 

4258 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict( 

4259 self, mock_deep_get 

4260 ): 

4261 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict.""" 

4262 interface = { 

4263 "name": "vdu-eth0", 

4264 "vcpi": "sample_vcpi", 

4265 "port_security": True, 

4266 "port_security_disable_strategy": "allow-address-pairs", 

4267 "floating_ip": "10.1.1.12", 

4268 "ns-vld-id": "mgmtnet", 

4269 "vnf-vld-id": "mgmt_cp_int", 

4270 "type": "PCI-PASSTHROUGH", 

4271 } 

4272 mock_deep_get.return_value = {} 

4273 tasks_by_target_record_id = {} 

4274 net_text = ns_preffix 

4275 net_item = {} 

4276 expected_net_item = { 

4277 "use": "data", 

4278 "model": "PCI-PASSTHROUGH", 

4279 "type": "PCI-PASSTHROUGH", 

4280 } 

4281 self.ns._prepare_type_of_interface( 

4282 interface, tasks_by_target_record_id, net_text, net_item 

4283 ) 

4284 self.assertDictEqual(net_item, expected_net_item) 

4285 mock_deep_get.assert_called_once_with( 

4286 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type" 

4287 ) 

4288 

4289 @patch("osm_ng_ro.ns.deep_get") 

4290 def test_prepare_type_of_interface_type_mgmt(self, mock_deep_get): 

4291 """Interface type is mgmt.""" 

4292 interface = { 

4293 "name": "vdu-eth0", 

4294 "vcpi": "sample_vcpi", 

4295 "port_security": True, 

4296 "port_security_disable_strategy": "allow-address-pairs", 

4297 "floating_ip": "10.1.1.12", 

4298 "ns-vld-id": "mgmtnet", 

4299 "vnf-vld-id": "mgmt_cp_int", 

4300 "type": "OM-MGMT", 

4301 } 

4302 tasks_by_target_record_id = {} 

4303 net_text = ns_preffix 

4304 net_item = {} 

4305 expected_net_item = { 

4306 "use": "mgmt", 

4307 } 

4308 self.ns._prepare_type_of_interface( 

4309 interface, tasks_by_target_record_id, net_text, net_item 

4310 ) 

4311 self.assertDictEqual(net_item, expected_net_item) 

4312 mock_deep_get.assert_not_called() 

4313 

4314 @patch("osm_ng_ro.ns.deep_get") 

4315 def test_prepare_type_of_interface_type_bridge(self, mock_deep_get): 

4316 """Interface type is bridge.""" 

4317 interface = { 

4318 "name": "vdu-eth0", 

4319 "vcpi": "sample_vcpi", 

4320 "port_security": True, 

4321 "port_security_disable_strategy": "allow-address-pairs", 

4322 "floating_ip": "10.1.1.12", 

4323 "ns-vld-id": "mgmtnet", 

4324 "vnf-vld-id": "mgmt_cp_int", 

4325 } 

4326 tasks_by_target_record_id = {} 

4327 net_text = ns_preffix 

4328 net_item = {} 

4329 expected_net_item = { 

4330 "use": "bridge", 

4331 "model": None, 

4332 } 

4333 self.ns._prepare_type_of_interface( 

4334 interface, tasks_by_target_record_id, net_text, net_item 

4335 ) 

4336 self.assertDictEqual(net_item, expected_net_item) 

4337 mock_deep_get.assert_not_called() 

4338 

4339 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4340 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4341 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4342 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4343 def test_prepare_vdu_interfaces( 

4344 self, 

4345 mock_type_of_interface, 

4346 mock_item_of_interface, 

4347 mock_port_security, 

4348 mock_vld_information_of_interface, 

4349 ): 

4350 """Prepare vdu interfaces successfully.""" 

4351 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4352 interface_1 = { 

4353 "name": "vdu-eth1", 

4354 "ns-vld-id": "net1", 

4355 "ip-address": "13.2.12.31", 

4356 "mgmt-interface": True, 

4357 } 

4358 interface_2 = { 

4359 "name": "vdu-eth2", 

4360 "vnf-vld-id": "net2", 

4361 "mac-address": "d0:94:66:ed:fc:e2", 

4362 } 

4363 interface_3 = { 

4364 "name": "vdu-eth3", 

4365 "ns-vld-id": "mgmtnet", 

4366 } 

4367 target_vdu["interfaces"] = [interface_1, interface_2, interface_3] 

4368 extra_dict = { 

4369 "params": "test_params", 

4370 "find_params": "test_find_params", 

4371 "depends_on": [], 

4372 } 

4373 

4374 net_text_1 = f"{ns_preffix}:net1" 

4375 net_text_2 = f"{vnf_preffix}:net2" 

4376 net_text_3 = f"{ns_preffix}:mgmtnet" 

4377 net_item_1 = { 

4378 "name": "vdu-eth1", 

4379 "net_id": f"TASK-{ns_preffix}", 

4380 "type": "virtual", 

4381 } 

4382 net_item_2 = { 

4383 "name": "vdu-eth2", 

4384 "net_id": f"TASK-{ns_preffix}", 

4385 "type": "virtual", 

4386 } 

4387 net_item_3 = { 

4388 "name": "vdu-eth3", 

4389 "net_id": f"TASK-{ns_preffix}", 

4390 "type": "virtual", 

4391 } 

4392 mock_item_of_interface.side_effect = [net_item_1, net_item_2, net_item_3] 

4393 mock_vld_information_of_interface.side_effect = [ 

4394 net_text_1, 

4395 net_text_2, 

4396 net_text_3, 

4397 ] 

4398 net_list = [] 

4399 expected_extra_dict = { 

4400 "params": "test_params", 

4401 "find_params": "test_find_params", 

4402 "depends_on": [net_text_1, net_text_2, net_text_3], 

4403 "mgmt_vdu_interface": 0, 

4404 } 

4405 updated_net_item1 = deepcopy(net_item_1) 

4406 updated_net_item1.update({"ip_address": "13.2.12.31"}) 

4407 updated_net_item2 = deepcopy(net_item_2) 

4408 updated_net_item2.update({"mac_address": "d0:94:66:ed:fc:e2"}) 

4409 expected_net_list = [updated_net_item1, updated_net_item2, net_item_3] 

4410 self.ns._prepare_vdu_interfaces( 

4411 target_vdu, 

4412 extra_dict, 

4413 ns_preffix, 

4414 vnf_preffix, 

4415 self.logger, 

4416 tasks_by_target_record_id, 

4417 net_list, 

4418 ) 

4419 _call_mock_vld_information_of_interface = ( 

4420 mock_vld_information_of_interface.call_args_list 

4421 ) 

4422 self.assertEqual( 

4423 _call_mock_vld_information_of_interface[0][0], 

4424 (interface_1, ns_preffix, vnf_preffix), 

4425 ) 

4426 self.assertEqual( 

4427 _call_mock_vld_information_of_interface[1][0], 

4428 (interface_2, ns_preffix, vnf_preffix), 

4429 ) 

4430 self.assertEqual( 

4431 _call_mock_vld_information_of_interface[2][0], 

4432 (interface_3, ns_preffix, vnf_preffix), 

4433 ) 

4434 

4435 _call_mock_port_security = mock_port_security.call_args_list 

4436 self.assertEqual(_call_mock_port_security[0].args[0], interface_1) 

4437 self.assertEqual(_call_mock_port_security[1].args[0], interface_2) 

4438 self.assertEqual(_call_mock_port_security[2].args[0], interface_3) 

4439 

4440 _call_mock_item_of_interface = mock_item_of_interface.call_args_list 

4441 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1)) 

4442 self.assertEqual(_call_mock_item_of_interface[1][0], (interface_2, net_text_2)) 

4443 self.assertEqual(_call_mock_item_of_interface[2][0], (interface_3, net_text_3)) 

4444 

4445 _call_mock_type_of_interface = mock_type_of_interface.call_args_list 

4446 self.assertEqual( 

4447 _call_mock_type_of_interface[0][0], 

4448 (interface_1, tasks_by_target_record_id, net_text_1, net_item_1), 

4449 ) 

4450 self.assertEqual( 

4451 _call_mock_type_of_interface[1][0], 

4452 (interface_2, tasks_by_target_record_id, net_text_2, net_item_2), 

4453 ) 

4454 self.assertEqual( 

4455 _call_mock_type_of_interface[2][0], 

4456 (interface_3, tasks_by_target_record_id, net_text_3, net_item_3), 

4457 ) 

4458 self.assertEqual(net_list, expected_net_list) 

4459 self.assertEqual(extra_dict, expected_extra_dict) 

4460 self.logger.error.assert_not_called() 

4461 

4462 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4463 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4464 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4465 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4466 def test_prepare_vdu_interfaces_create_net_item_raise_exception( 

4467 self, 

4468 mock_type_of_interface, 

4469 mock_item_of_interface, 

4470 mock_port_security, 

4471 mock_vld_information_of_interface, 

4472 ): 

4473 """Prepare vdu interfaces, create_net_item_of_interface method raise exception.""" 

4474 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4475 interface_1 = { 

4476 "name": "vdu-eth1", 

4477 "ns-vld-id": "net1", 

4478 "ip-address": "13.2.12.31", 

4479 "mgmt-interface": True, 

4480 } 

4481 interface_2 = { 

4482 "name": "vdu-eth2", 

4483 "vnf-vld-id": "net2", 

4484 "mac-address": "d0:94:66:ed:fc:e2", 

4485 } 

4486 interface_3 = { 

4487 "name": "vdu-eth3", 

4488 "ns-vld-id": "mgmtnet", 

4489 } 

4490 target_vdu["interfaces"] = [interface_1, interface_2, interface_3] 

4491 extra_dict = { 

4492 "params": "test_params", 

4493 "find_params": "test_find_params", 

4494 "depends_on": [], 

4495 } 

4496 net_text_1 = f"{ns_preffix}:net1" 

4497 mock_item_of_interface.side_effect = [TypeError, TypeError, TypeError] 

4498 

4499 mock_vld_information_of_interface.side_effect = [net_text_1] 

4500 net_list = [] 

4501 expected_extra_dict = { 

4502 "params": "test_params", 

4503 "find_params": "test_find_params", 

4504 "depends_on": [net_text_1], 

4505 } 

4506 with self.assertRaises(TypeError): 

4507 self.ns._prepare_vdu_interfaces( 

4508 target_vdu, 

4509 extra_dict, 

4510 ns_preffix, 

4511 vnf_preffix, 

4512 self.logger, 

4513 tasks_by_target_record_id, 

4514 net_list, 

4515 ) 

4516 

4517 _call_mock_vld_information_of_interface = ( 

4518 mock_vld_information_of_interface.call_args_list 

4519 ) 

4520 self.assertEqual( 

4521 _call_mock_vld_information_of_interface[0][0], 

4522 (interface_1, ns_preffix, vnf_preffix), 

4523 ) 

4524 

4525 _call_mock_port_security = mock_port_security.call_args_list 

4526 self.assertEqual(_call_mock_port_security[0].args[0], interface_1) 

4527 

4528 _call_mock_item_of_interface = mock_item_of_interface.call_args_list 

4529 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1)) 

4530 

4531 mock_type_of_interface.assert_not_called() 

4532 self.logger.error.assert_not_called() 

4533 self.assertEqual(net_list, []) 

4534 self.assertEqual(extra_dict, expected_extra_dict) 

4535 

4536 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4537 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4538 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4539 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4540 def test_prepare_vdu_interfaces_vld_information_is_empty( 

4541 self, 

4542 mock_type_of_interface, 

4543 mock_item_of_interface, 

4544 mock_port_security, 

4545 mock_vld_information_of_interface, 

4546 ): 

4547 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result.""" 

4548 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4549 interface_1 = { 

4550 "name": "vdu-eth1", 

4551 "ns-vld-id": "net1", 

4552 "ip-address": "13.2.12.31", 

4553 "mgmt-interface": True, 

4554 } 

4555 interface_2 = { 

4556 "name": "vdu-eth2", 

4557 "vnf-vld-id": "net2", 

4558 "mac-address": "d0:94:66:ed:fc:e2", 

4559 } 

4560 interface_3 = { 

4561 "name": "vdu-eth3", 

4562 "ns-vld-id": "mgmtnet", 

4563 } 

4564 target_vdu["interfaces"] = [interface_1, interface_2, interface_3] 

4565 extra_dict = { 

4566 "params": "test_params", 

4567 "find_params": "test_find_params", 

4568 "depends_on": [], 

4569 } 

4570 mock_vld_information_of_interface.side_effect = ["", "", ""] 

4571 net_list = [] 

4572 self.ns._prepare_vdu_interfaces( 

4573 target_vdu, 

4574 extra_dict, 

4575 ns_preffix, 

4576 vnf_preffix, 

4577 self.logger, 

4578 tasks_by_target_record_id, 

4579 net_list, 

4580 ) 

4581 

4582 _call_mock_vld_information_of_interface = ( 

4583 mock_vld_information_of_interface.call_args_list 

4584 ) 

4585 self.assertEqual( 

4586 _call_mock_vld_information_of_interface[0][0], 

4587 (interface_1, ns_preffix, vnf_preffix), 

4588 ) 

4589 self.assertEqual( 

4590 _call_mock_vld_information_of_interface[1][0], 

4591 (interface_2, ns_preffix, vnf_preffix), 

4592 ) 

4593 self.assertEqual( 

4594 _call_mock_vld_information_of_interface[2][0], 

4595 (interface_3, ns_preffix, vnf_preffix), 

4596 ) 

4597 

4598 _call_logger = self.logger.error.call_args_list 

4599 self.assertEqual( 

4600 _call_logger[0][0], 

4601 ("Interface 0 from vdu several_volumes-VM not connected to any vld",), 

4602 ) 

4603 self.assertEqual( 

4604 _call_logger[1][0], 

4605 ("Interface 1 from vdu several_volumes-VM not connected to any vld",), 

4606 ) 

4607 self.assertEqual( 

4608 _call_logger[2][0], 

4609 ("Interface 2 from vdu several_volumes-VM not connected to any vld",), 

4610 ) 

4611 self.assertEqual(net_list, []) 

4612 self.assertEqual( 

4613 extra_dict, 

4614 { 

4615 "params": "test_params", 

4616 "find_params": "test_find_params", 

4617 "depends_on": [], 

4618 }, 

4619 ) 

4620 

4621 mock_item_of_interface.assert_not_called() 

4622 mock_port_security.assert_not_called() 

4623 mock_type_of_interface.assert_not_called() 

4624 

4625 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4626 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4627 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4628 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4629 def test_prepare_vdu_interfaces_empty_interface_list( 

4630 self, 

4631 mock_type_of_interface, 

4632 mock_item_of_interface, 

4633 mock_port_security, 

4634 mock_vld_information_of_interface, 

4635 ): 

4636 """Prepare vdu interfaces, interface list is empty.""" 

4637 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4638 target_vdu["interfaces"] = [] 

4639 extra_dict = {} 

4640 net_list = [] 

4641 self.ns._prepare_vdu_interfaces( 

4642 target_vdu, 

4643 extra_dict, 

4644 ns_preffix, 

4645 vnf_preffix, 

4646 self.logger, 

4647 tasks_by_target_record_id, 

4648 net_list, 

4649 ) 

4650 mock_type_of_interface.assert_not_called() 

4651 mock_vld_information_of_interface.assert_not_called() 

4652 mock_item_of_interface.assert_not_called() 

4653 mock_port_security.assert_not_called() 

4654 

4655 def test_prepare_vdu_ssh_keys(self): 

4656 """Target_vdu has ssh-keys and ro_nsr_public_key exists.""" 

4657 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4658 target_vdu["ssh-keys"] = ["sample-ssh-key"] 

4659 ro_nsr_public_key = {"public_key": "path_of_public_key"} 

4660 target_vdu["ssh-access-required"] = True 

4661 cloud_config = {} 

4662 expected_cloud_config = { 

4663 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}] 

4664 } 

4665 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config) 

4666 self.assertDictEqual(cloud_config, expected_cloud_config) 

4667 

4668 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self): 

4669 """Target_vdu does not have ssh-keys.""" 

4670 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4671 ro_nsr_public_key = {"public_key": "path_of_public_key"} 

4672 target_vdu["ssh-access-required"] = True 

4673 cloud_config = {} 

4674 expected_cloud_config = {"key-pairs": [{"public_key": "path_of_public_key"}]} 

4675 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config) 

4676 self.assertDictEqual(cloud_config, expected_cloud_config) 

4677 

4678 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self): 

4679 """Target_vdu has ssh-keys, ssh-access is not required.""" 

4680 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4681 target_vdu["ssh-keys"] = ["sample-ssh-key"] 

4682 ro_nsr_public_key = {"public_key": "path_of_public_key"} 

4683 target_vdu["ssh-access-required"] = False 

4684 cloud_config = {} 

4685 expected_cloud_config = {"key-pairs": ["sample-ssh-key"]} 

4686 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config) 

4687 self.assertDictEqual(cloud_config, expected_cloud_config) 

4688 

4689 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4690 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4691 def test_add_persistent_root_disk_to_disk_list_keep_false( 

4692 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4693 ): 

4694 """Add persistent root disk to disk_list, keep volume set to False.""" 

4695 root_disk = { 

4696 "id": "persistent-root-volume", 

4697 "type-of-storage": "persistent-storage:persistent-storage", 

4698 "size-of-storage": "10", 

4699 } 

4700 mock_select_persistent_root_disk.return_value = root_disk 

4701 vnfd = deepcopy(vnfd_wth_persistent_storage) 

4702 vnfd["virtual-storage-desc"][1] = root_disk 

4703 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4704 persistent_root_disk = {} 

4705 disk_list = [] 

4706 mock_volume_keeping_required.return_value = False 

4707 expected_disk_list = [ 

4708 { 

4709 "image_id": "ubuntu20.04", 

4710 "size": "10", 

4711 "keep": False, 

4712 } 

4713 ] 

4714 self.ns._add_persistent_root_disk_to_disk_list( 

4715 vnfd, target_vdu, persistent_root_disk, disk_list 

4716 ) 

4717 self.assertEqual(disk_list, expected_disk_list) 

4718 mock_select_persistent_root_disk.assert_called_once() 

4719 mock_volume_keeping_required.assert_called_once() 

4720 

4721 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4722 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4723 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises( 

4724 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4725 ): 

4726 """Add persistent root disk to disk_list""" 

4727 root_disk = { 

4728 "id": "persistent-root-volume", 

4729 "type-of-storage": "persistent-storage:persistent-storage", 

4730 "size-of-storage": "10", 

4731 } 

4732 mock_select_persistent_root_disk.side_effect = AttributeError 

4733 vnfd = deepcopy(vnfd_wth_persistent_storage) 

4734 vnfd["virtual-storage-desc"][1] = root_disk 

4735 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4736 persistent_root_disk = {} 

4737 disk_list = [] 

4738 with self.assertRaises(AttributeError): 

4739 self.ns._add_persistent_root_disk_to_disk_list( 

4740 vnfd, target_vdu, persistent_root_disk, disk_list 

4741 ) 

4742 self.assertEqual(disk_list, []) 

4743 mock_select_persistent_root_disk.assert_called_once() 

4744 mock_volume_keeping_required.assert_not_called() 

4745 

4746 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4747 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4748 def test_add_persistent_root_disk_to_disk_list_keep_true( 

4749 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4750 ): 

4751 """Add persistent root disk, keeo volume set to True.""" 

4752 vnfd = deepcopy(vnfd_wth_persistent_storage) 

4753 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4754 mock_volume_keeping_required.return_value = True 

4755 root_disk = { 

4756 "id": "persistent-root-volume", 

4757 "type-of-storage": "persistent-storage:persistent-storage", 

4758 "size-of-storage": "10", 

4759 "vdu-storage-requirements": [ 

4760 {"key": "keep-volume", "value": "true"}, 

4761 ], 

4762 } 

4763 mock_select_persistent_root_disk.return_value = root_disk 

4764 persistent_root_disk = {} 

4765 disk_list = [] 

4766 expected_disk_list = [ 

4767 { 

4768 "image_id": "ubuntu20.04", 

4769 "size": "10", 

4770 "keep": True, 

4771 } 

4772 ] 

4773 self.ns._add_persistent_root_disk_to_disk_list( 

4774 vnfd, target_vdu, persistent_root_disk, disk_list 

4775 ) 

4776 self.assertEqual(disk_list, expected_disk_list) 

4777 mock_volume_keeping_required.assert_called_once_with(root_disk) 

4778 

4779 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4780 def test_add_persistent_ordinary_disk_to_disk_list( 

4781 self, mock_volume_keeping_required 

4782 ): 

4783 """Add persistent ordinary disk, keeo volume set to True.""" 

4784 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4785 mock_volume_keeping_required.return_value = False 

4786 persistent_root_disk = { 

4787 "persistent-root-volume": { 

4788 "image_id": "ubuntu20.04", 

4789 "size": "10", 

4790 "keep": True, 

4791 } 

4792 } 

4793 ordinary_disk = { 

4794 "id": "persistent-volume2", 

4795 "type-of-storage": "persistent-storage:persistent-storage", 

4796 "size-of-storage": "10", 

4797 } 

4798 persistent_ordinary_disk = {} 

4799 disk_list = [] 

4800 extra_dict = {} 

4801 expected_disk_list = [ 

4802 { 

4803 "size": "10", 

4804 "keep": False, 

4805 "multiattach": False, 

4806 "name": "persistent-volume2", 

4807 } 

4808 ] 

4809 self.ns._add_persistent_ordinary_disks_to_disk_list( 

4810 target_vdu, 

4811 persistent_root_disk, 

4812 persistent_ordinary_disk, 

4813 disk_list, 

4814 extra_dict, 

4815 ) 

4816 self.assertEqual(disk_list, expected_disk_list) 

4817 mock_volume_keeping_required.assert_called_once_with(ordinary_disk) 

4818 

4819 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4820 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict( 

4821 self, mock_volume_keeping_required 

4822 ): 

4823 """Add persistent ordinary disk, vsd id is in root_disk dict.""" 

4824 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4825 mock_volume_keeping_required.return_value = False 

4826 persistent_root_disk = { 

4827 "persistent-root-volume": { 

4828 "image_id": "ubuntu20.04", 

4829 "size": "10", 

4830 "keep": True, 

4831 }, 

4832 "persistent-volume2": { 

4833 "size": "10", 

4834 }, 

4835 } 

4836 persistent_ordinary_disk = {} 

4837 disk_list = [] 

4838 extra_dict = {} 

4839 

4840 self.ns._add_persistent_ordinary_disks_to_disk_list( 

4841 target_vdu, 

4842 persistent_root_disk, 

4843 persistent_ordinary_disk, 

4844 disk_list, 

4845 extra_dict, 

4846 ) 

4847 self.assertEqual(disk_list, []) 

4848 mock_volume_keeping_required.assert_not_called() 

4849 

4850 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4851 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4852 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage( 

4853 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4854 ): 

4855 """VNFD does not have persistent storage.""" 

4856 vnfd = deepcopy(vnfd_wthout_persistent_storage) 

4857 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

4858 mock_select_persistent_root_disk.return_value = None 

4859 persistent_root_disk = {} 

4860 disk_list = [] 

4861 self.ns._add_persistent_root_disk_to_disk_list( 

4862 vnfd, target_vdu, persistent_root_disk, disk_list 

4863 ) 

4864 self.assertEqual(disk_list, []) 

4865 self.assertEqual(mock_select_persistent_root_disk.call_count, 2) 

4866 mock_volume_keeping_required.assert_not_called() 

4867 

4868 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4869 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4870 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk( 

4871 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4872 ): 

4873 """Persistent_root_disk dict is empty.""" 

4874 vnfd = deepcopy(vnfd_wthout_persistent_storage) 

4875 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

4876 mock_select_persistent_root_disk.return_value = None 

4877 persistent_root_disk = {} 

4878 disk_list = [] 

4879 self.ns._add_persistent_root_disk_to_disk_list( 

4880 vnfd, target_vdu, persistent_root_disk, disk_list 

4881 ) 

4882 self.assertEqual(disk_list, []) 

4883 self.assertEqual(mock_select_persistent_root_disk.call_count, 2) 

4884 mock_volume_keeping_required.assert_not_called() 

4885 

4886 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self): 

4887 """Invalid extra dict.""" 

4888 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4889 target_vdu["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id" 

4890 extra_dict = {} 

4891 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4892 with self.assertRaises(NsException) as err: 

4893 self.ns._prepare_vdu_affinity_group_list(target_vdu, extra_dict, ns_preffix) 

4894 self.assertEqual(str(err.exception), "Invalid extra_dict format.") 

4895 

4896 def test_prepare_vdu_affinity_group_list_one_affinity_group(self): 

4897 """There is one affinity-group.""" 

4898 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4899 target_vdu["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"] 

4900 extra_dict = {"depends_on": []} 

4901 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4902 affinity_group_txt = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id" 

4903 expected_result = [{"affinity_group_id": "TASK-" + affinity_group_txt}] 

4904 expected_extra_dict = {"depends_on": [affinity_group_txt]} 

4905 result = self.ns._prepare_vdu_affinity_group_list( 

4906 target_vdu, extra_dict, ns_preffix 

4907 ) 

4908 self.assertDictEqual(extra_dict, expected_extra_dict) 

4909 self.assertEqual(result, expected_result) 

4910 

4911 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self): 

4912 """There are two affinity-groups.""" 

4913 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4914 target_vdu["affinity-or-anti-affinity-group-id"] = [ 

4915 "affinity-group-id1", 

4916 "affinity-group-id2", 

4917 ] 

4918 extra_dict = {"depends_on": []} 

4919 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4920 affinity_group_txt1 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1" 

4921 affinity_group_txt2 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2" 

4922 expected_result = [ 

4923 {"affinity_group_id": "TASK-" + affinity_group_txt1}, 

4924 {"affinity_group_id": "TASK-" + affinity_group_txt2}, 

4925 ] 

4926 expected_extra_dict = {"depends_on": [affinity_group_txt1, affinity_group_txt2]} 

4927 result = self.ns._prepare_vdu_affinity_group_list( 

4928 target_vdu, extra_dict, ns_preffix 

4929 ) 

4930 self.assertDictEqual(extra_dict, expected_extra_dict) 

4931 self.assertEqual(result, expected_result) 

4932 

4933 def test_prepare_vdu_affinity_group_list_no_affinity_group(self): 

4934 """There is not any affinity-group.""" 

4935 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4936 extra_dict = {"depends_on": []} 

4937 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4938 result = self.ns._prepare_vdu_affinity_group_list( 

4939 target_vdu, extra_dict, ns_preffix 

4940 ) 

4941 self.assertDictEqual(extra_dict, {"depends_on": []}) 

4942 self.assertEqual(result, []) 

4943 

4944 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

4945 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

4946 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

4947 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

4948 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

4949 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

4950 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

4951 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

4952 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

4953 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

4954 def test_process_vdu_params_with_inst_vol_list( 

4955 self, 

4956 mock_prepare_vdu_affinity_group_list, 

4957 mock_add_persistent_ordinary_disks_to_disk_list, 

4958 mock_add_persistent_root_disk_to_disk_list, 

4959 mock_find_persistent_volumes, 

4960 mock_find_persistent_root_volumes, 

4961 mock_prepare_vdu_ssh_keys, 

4962 mock_prepare_vdu_cloud_init, 

4963 mock_prepare_vdu_interfaces, 

4964 mock_locate_vdu_interfaces, 

4965 mock_sort_vdu_interfaces, 

4966 ): 

4967 """Instantiation volume list is empty.""" 

4968 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4969 

4970 target_vdu["interfaces"] = interfaces_wth_all_positions 

4971 

4972 vdu_instantiation_vol_list = [ 

4973 { 

4974 "vim-volume-id": vim_volume_id, 

4975 "name": "persistent-volume2", 

4976 } 

4977 ] 

4978 target_vdu["additionalParams"] = { 

4979 "OSM": {"vdu_volumes": vdu_instantiation_vol_list} 

4980 } 

4981 mock_prepare_vdu_cloud_init.return_value = {} 

4982 mock_prepare_vdu_affinity_group_list.return_value = [] 

4983 persistent_root_disk = { 

4984 "persistent-root-volume": { 

4985 "image_id": "ubuntu20.04", 

4986 "size": "10", 

4987 } 

4988 } 

4989 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

4990 

4991 new_kwargs = deepcopy(kwargs) 

4992 new_kwargs.update( 

4993 { 

4994 "vnfr_id": vnfr_id, 

4995 "nsr_id": nsr_id, 

4996 "tasks_by_target_record_id": {}, 

4997 "logger": "logger", 

4998 } 

4999 ) 

5000 expected_extra_dict_copy = deepcopy(expected_extra_dict) 

5001 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5002 db.get_one.return_value = vnfd 

5003 result = Ns._process_vdu_params( 

5004 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5005 ) 

5006 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5007 mock_locate_vdu_interfaces.assert_not_called() 

5008 mock_prepare_vdu_cloud_init.assert_called_once() 

5009 mock_add_persistent_root_disk_to_disk_list.assert_not_called() 

5010 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called() 

5011 mock_prepare_vdu_interfaces.assert_called_once_with( 

5012 target_vdu, 

5013 expected_extra_dict_copy, 

5014 ns_preffix, 

5015 vnf_preffix, 

5016 "logger", 

5017 {}, 

5018 [], 

5019 ) 

5020 self.assertDictEqual(result, expected_extra_dict_copy) 

5021 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5022 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5023 mock_find_persistent_volumes.assert_called_once_with( 

5024 persistent_root_disk, target_vdu, vdu_instantiation_vol_list, [] 

5025 ) 

5026 

5027 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5028 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5029 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5030 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5031 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5032 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5033 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5034 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5035 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5036 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5037 def test_process_vdu_params_wth_affinity_groups( 

5038 self, 

5039 mock_prepare_vdu_affinity_group_list, 

5040 mock_add_persistent_ordinary_disks_to_disk_list, 

5041 mock_add_persistent_root_disk_to_disk_list, 

5042 mock_find_persistent_volumes, 

5043 mock_find_persistent_root_volumes, 

5044 mock_prepare_vdu_ssh_keys, 

5045 mock_prepare_vdu_cloud_init, 

5046 mock_prepare_vdu_interfaces, 

5047 mock_locate_vdu_interfaces, 

5048 mock_sort_vdu_interfaces, 

5049 ): 

5050 """There is cloud-config.""" 

5051 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

5052 

5053 self.maxDiff = None 

5054 target_vdu["interfaces"] = interfaces_wth_all_positions 

5055 mock_prepare_vdu_cloud_init.return_value = {} 

5056 mock_prepare_vdu_affinity_group_list.return_value = [ 

5057 "affinity_group_1", 

5058 "affinity_group_2", 

5059 ] 

5060 

5061 new_kwargs = deepcopy(kwargs) 

5062 new_kwargs.update( 

5063 { 

5064 "vnfr_id": vnfr_id, 

5065 "nsr_id": nsr_id, 

5066 "tasks_by_target_record_id": {}, 

5067 "logger": "logger", 

5068 } 

5069 ) 

5070 expected_extra_dict3 = deepcopy(expected_extra_dict2) 

5071 expected_extra_dict3["params"]["affinity_group_list"] = [ 

5072 "affinity_group_1", 

5073 "affinity_group_2", 

5074 ] 

5075 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5076 db.get_one.return_value = vnfd 

5077 result = Ns._process_vdu_params( 

5078 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5079 ) 

5080 self.assertDictEqual(result, expected_extra_dict3) 

5081 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5082 mock_locate_vdu_interfaces.assert_not_called() 

5083 mock_prepare_vdu_cloud_init.assert_called_once() 

5084 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5085 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5086 mock_prepare_vdu_interfaces.assert_called_once_with( 

5087 target_vdu, 

5088 expected_extra_dict3, 

5089 ns_preffix, 

5090 vnf_preffix, 

5091 "logger", 

5092 {}, 

5093 [], 

5094 ) 

5095 

5096 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5097 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5098 mock_find_persistent_volumes.assert_not_called() 

5099 

5100 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5101 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5102 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5103 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5104 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5105 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5106 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5107 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5108 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5109 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5110 def test_process_vdu_params_wth_cloud_config( 

5111 self, 

5112 mock_prepare_vdu_affinity_group_list, 

5113 mock_add_persistent_ordinary_disks_to_disk_list, 

5114 mock_add_persistent_root_disk_to_disk_list, 

5115 mock_find_persistent_volumes, 

5116 mock_find_persistent_root_volumes, 

5117 mock_prepare_vdu_ssh_keys, 

5118 mock_prepare_vdu_cloud_init, 

5119 mock_prepare_vdu_interfaces, 

5120 mock_locate_vdu_interfaces, 

5121 mock_sort_vdu_interfaces, 

5122 ): 

5123 """There is cloud-config.""" 

5124 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

5125 

5126 self.maxDiff = None 

5127 target_vdu["interfaces"] = interfaces_wth_all_positions 

5128 mock_prepare_vdu_cloud_init.return_value = { 

5129 "user-data": user_data, 

5130 "boot-data-drive": "vda", 

5131 } 

5132 mock_prepare_vdu_affinity_group_list.return_value = [] 

5133 

5134 new_kwargs = deepcopy(kwargs) 

5135 new_kwargs.update( 

5136 { 

5137 "vnfr_id": vnfr_id, 

5138 "nsr_id": nsr_id, 

5139 "tasks_by_target_record_id": {}, 

5140 "logger": "logger", 

5141 } 

5142 ) 

5143 expected_extra_dict3 = deepcopy(expected_extra_dict2) 

5144 expected_extra_dict3["params"]["cloud_config"] = { 

5145 "user-data": user_data, 

5146 "boot-data-drive": "vda", 

5147 } 

5148 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5149 db.get_one.return_value = vnfd 

5150 result = Ns._process_vdu_params( 

5151 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5152 ) 

5153 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5154 mock_locate_vdu_interfaces.assert_not_called() 

5155 mock_prepare_vdu_cloud_init.assert_called_once() 

5156 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5157 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5158 mock_prepare_vdu_interfaces.assert_called_once_with( 

5159 target_vdu, 

5160 expected_extra_dict3, 

5161 ns_preffix, 

5162 vnf_preffix, 

5163 "logger", 

5164 {}, 

5165 [], 

5166 ) 

5167 self.assertDictEqual(result, expected_extra_dict3) 

5168 mock_prepare_vdu_ssh_keys.assert_called_once_with( 

5169 target_vdu, None, {"user-data": user_data, "boot-data-drive": "vda"} 

5170 ) 

5171 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5172 mock_find_persistent_volumes.assert_not_called() 

5173 

5174 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5175 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5176 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5177 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5178 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5179 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5180 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5181 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5182 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5183 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5184 def test_process_vdu_params_wthout_persistent_storage( 

5185 self, 

5186 mock_prepare_vdu_affinity_group_list, 

5187 mock_add_persistent_ordinary_disks_to_disk_list, 

5188 mock_add_persistent_root_disk_to_disk_list, 

5189 mock_find_persistent_volumes, 

5190 mock_find_persistent_root_volumes, 

5191 mock_prepare_vdu_ssh_keys, 

5192 mock_prepare_vdu_cloud_init, 

5193 mock_prepare_vdu_interfaces, 

5194 mock_locate_vdu_interfaces, 

5195 mock_sort_vdu_interfaces, 

5196 ): 

5197 """There is not any persistent storage.""" 

5198 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

5199 

5200 self.maxDiff = None 

5201 target_vdu["interfaces"] = interfaces_wth_all_positions 

5202 mock_prepare_vdu_cloud_init.return_value = {} 

5203 mock_prepare_vdu_affinity_group_list.return_value = [] 

5204 

5205 new_kwargs = deepcopy(kwargs) 

5206 new_kwargs.update( 

5207 { 

5208 "vnfr_id": vnfr_id, 

5209 "nsr_id": nsr_id, 

5210 "tasks_by_target_record_id": {}, 

5211 "logger": "logger", 

5212 } 

5213 ) 

5214 expected_extra_dict_copy = deepcopy(expected_extra_dict2) 

5215 vnfd = deepcopy(vnfd_wthout_persistent_storage) 

5216 db.get_one.return_value = vnfd 

5217 result = Ns._process_vdu_params( 

5218 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5219 ) 

5220 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5221 mock_locate_vdu_interfaces.assert_not_called() 

5222 mock_prepare_vdu_cloud_init.assert_called_once() 

5223 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5224 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5225 mock_prepare_vdu_interfaces.assert_called_once_with( 

5226 target_vdu, 

5227 expected_extra_dict_copy, 

5228 ns_preffix, 

5229 vnf_preffix, 

5230 "logger", 

5231 {}, 

5232 [], 

5233 ) 

5234 self.assertDictEqual(result, expected_extra_dict_copy) 

5235 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5236 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5237 mock_find_persistent_volumes.assert_not_called() 

5238 

5239 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5240 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5241 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5242 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5243 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5244 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5245 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5246 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5247 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5248 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5249 def test_process_vdu_params_interfaces_partially_located( 

5250 self, 

5251 mock_prepare_vdu_affinity_group_list, 

5252 mock_add_persistent_ordinary_disks_to_disk_list, 

5253 mock_add_persistent_root_disk_to_disk_list, 

5254 mock_find_persistent_volumes, 

5255 mock_find_persistent_root_volumes, 

5256 mock_prepare_vdu_ssh_keys, 

5257 mock_prepare_vdu_cloud_init, 

5258 mock_prepare_vdu_interfaces, 

5259 mock_locate_vdu_interfaces, 

5260 mock_sort_vdu_interfaces, 

5261 ): 

5262 """Some interfaces have position.""" 

5263 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5264 

5265 self.maxDiff = None 

5266 target_vdu["interfaces"] = [ 

5267 { 

5268 "name": "vdu-eth1", 

5269 "ns-vld-id": "net1", 

5270 }, 

5271 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2}, 

5272 { 

5273 "name": "vdu-eth3", 

5274 "ns-vld-id": "mgmtnet", 

5275 }, 

5276 ] 

5277 mock_prepare_vdu_cloud_init.return_value = {} 

5278 mock_prepare_vdu_affinity_group_list.return_value = [] 

5279 persistent_root_disk = { 

5280 "persistent-root-volume": { 

5281 "image_id": "ubuntu20.04", 

5282 "size": "10", 

5283 "keep": True, 

5284 } 

5285 } 

5286 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

5287 

5288 new_kwargs = deepcopy(kwargs) 

5289 new_kwargs.update( 

5290 { 

5291 "vnfr_id": vnfr_id, 

5292 "nsr_id": nsr_id, 

5293 "tasks_by_target_record_id": {}, 

5294 "logger": "logger", 

5295 } 

5296 ) 

5297 

5298 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5299 db.get_one.return_value = vnfd 

5300 result = Ns._process_vdu_params( 

5301 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5302 ) 

5303 expected_extra_dict_copy = deepcopy(expected_extra_dict) 

5304 mock_sort_vdu_interfaces.assert_not_called() 

5305 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5306 mock_prepare_vdu_cloud_init.assert_called_once() 

5307 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5308 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5309 mock_prepare_vdu_interfaces.assert_called_once_with( 

5310 target_vdu, 

5311 expected_extra_dict_copy, 

5312 ns_preffix, 

5313 vnf_preffix, 

5314 "logger", 

5315 {}, 

5316 [], 

5317 ) 

5318 self.assertDictEqual(result, expected_extra_dict_copy) 

5319 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5320 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5321 mock_find_persistent_volumes.assert_not_called() 

5322 mock_find_persistent_root_volumes.assert_not_called() 

5323 

5324 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5325 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5326 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5327 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5328 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5329 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5330 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5331 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5332 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5333 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5334 def test_process_vdu_params_no_interface_position( 

5335 self, 

5336 mock_prepare_vdu_affinity_group_list, 

5337 mock_add_persistent_ordinary_disks_to_disk_list, 

5338 mock_add_persistent_root_disk_to_disk_list, 

5339 mock_find_persistent_volumes, 

5340 mock_find_persistent_root_volumes, 

5341 mock_prepare_vdu_ssh_keys, 

5342 mock_prepare_vdu_cloud_init, 

5343 mock_prepare_vdu_interfaces, 

5344 mock_locate_vdu_interfaces, 

5345 mock_sort_vdu_interfaces, 

5346 ): 

5347 """Interfaces do not have position.""" 

5348 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5349 

5350 self.maxDiff = None 

5351 target_vdu["interfaces"] = interfaces_wthout_positions 

5352 mock_prepare_vdu_cloud_init.return_value = {} 

5353 mock_prepare_vdu_affinity_group_list.return_value = [] 

5354 persistent_root_disk = { 

5355 "persistent-root-volume": { 

5356 "image_id": "ubuntu20.04", 

5357 "size": "10", 

5358 "keep": True, 

5359 } 

5360 } 

5361 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

5362 new_kwargs = deepcopy(kwargs) 

5363 new_kwargs.update( 

5364 { 

5365 "vnfr_id": vnfr_id, 

5366 "nsr_id": nsr_id, 

5367 "tasks_by_target_record_id": {}, 

5368 "logger": "logger", 

5369 } 

5370 ) 

5371 

5372 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5373 db.get_one.return_value = vnfd 

5374 result = Ns._process_vdu_params( 

5375 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5376 ) 

5377 expected_extra_dict_copy = deepcopy(expected_extra_dict) 

5378 mock_sort_vdu_interfaces.assert_not_called() 

5379 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5380 mock_prepare_vdu_cloud_init.assert_called_once() 

5381 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5382 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5383 mock_prepare_vdu_interfaces.assert_called_once_with( 

5384 target_vdu, 

5385 expected_extra_dict_copy, 

5386 ns_preffix, 

5387 vnf_preffix, 

5388 "logger", 

5389 {}, 

5390 [], 

5391 ) 

5392 self.assertDictEqual(result, expected_extra_dict_copy) 

5393 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5394 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5395 mock_find_persistent_volumes.assert_not_called() 

5396 mock_find_persistent_root_volumes.assert_not_called() 

5397 

5398 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5399 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5400 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5401 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5402 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5403 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5404 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5405 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5406 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5407 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5408 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception( 

5409 self, 

5410 mock_prepare_vdu_affinity_group_list, 

5411 mock_add_persistent_ordinary_disks_to_disk_list, 

5412 mock_add_persistent_root_disk_to_disk_list, 

5413 mock_find_persistent_volumes, 

5414 mock_find_persistent_root_volumes, 

5415 mock_prepare_vdu_ssh_keys, 

5416 mock_prepare_vdu_cloud_init, 

5417 mock_prepare_vdu_interfaces, 

5418 mock_locate_vdu_interfaces, 

5419 mock_sort_vdu_interfaces, 

5420 ): 

5421 """Prepare vdu interfaces method raises exception.""" 

5422 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5423 

5424 self.maxDiff = None 

5425 target_vdu["interfaces"] = interfaces_wthout_positions 

5426 mock_prepare_vdu_cloud_init.return_value = {} 

5427 mock_prepare_vdu_affinity_group_list.return_value = [] 

5428 persistent_root_disk = { 

5429 "persistent-root-volume": { 

5430 "image_id": "ubuntu20.04", 

5431 "size": "10", 

5432 "keep": True, 

5433 } 

5434 } 

5435 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

5436 new_kwargs = deepcopy(kwargs) 

5437 new_kwargs.update( 

5438 { 

5439 "vnfr_id": vnfr_id, 

5440 "nsr_id": nsr_id, 

5441 "tasks_by_target_record_id": {}, 

5442 "logger": "logger", 

5443 } 

5444 ) 

5445 mock_prepare_vdu_interfaces.side_effect = TypeError 

5446 

5447 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5448 db.get_one.return_value = vnfd 

5449 with self.assertRaises(Exception) as err: 

5450 Ns._process_vdu_params( 

5451 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5452 ) 

5453 self.assertEqual(type(err), TypeError) 

5454 mock_sort_vdu_interfaces.assert_not_called() 

5455 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5456 mock_prepare_vdu_cloud_init.assert_not_called() 

5457 mock_add_persistent_root_disk_to_disk_list.assert_not_called() 

5458 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called() 

5459 mock_prepare_vdu_interfaces.assert_called_once() 

5460 mock_prepare_vdu_ssh_keys.assert_not_called() 

5461 mock_prepare_vdu_affinity_group_list.assert_not_called() 

5462 mock_find_persistent_volumes.assert_not_called() 

5463 mock_find_persistent_root_volumes.assert_not_called() 

5464 

5465 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5466 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5467 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5468 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5469 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5470 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5471 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5472 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5473 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5474 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5475 def test_process_vdu_params_add_persistent_root_disk_raises_exception( 

5476 self, 

5477 mock_prepare_vdu_affinity_group_list, 

5478 mock_add_persistent_ordinary_disks_to_disk_list, 

5479 mock_add_persistent_root_disk_to_disk_list, 

5480 mock_find_persistent_volumes, 

5481 mock_find_persistent_root_volumes, 

5482 mock_prepare_vdu_ssh_keys, 

5483 mock_prepare_vdu_cloud_init, 

5484 mock_prepare_vdu_interfaces, 

5485 mock_locate_vdu_interfaces, 

5486 mock_sort_vdu_interfaces, 

5487 ): 

5488 """Add persistent root disk method raises exception.""" 

5489 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5490 

5491 self.maxDiff = None 

5492 target_vdu["interfaces"] = interfaces_wthout_positions 

5493 mock_prepare_vdu_cloud_init.return_value = {} 

5494 mock_prepare_vdu_affinity_group_list.return_value = [] 

5495 mock_add_persistent_root_disk_to_disk_list.side_effect = KeyError 

5496 new_kwargs = deepcopy(kwargs) 

5497 new_kwargs.update( 

5498 { 

5499 "vnfr_id": vnfr_id, 

5500 "nsr_id": nsr_id, 

5501 "tasks_by_target_record_id": {}, 

5502 "logger": "logger", 

5503 } 

5504 ) 

5505 

5506 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5507 db.get_one.return_value = vnfd 

5508 with self.assertRaises(Exception) as err: 

5509 Ns._process_vdu_params( 

5510 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5511 ) 

5512 self.assertEqual(type(err), KeyError) 

5513 mock_sort_vdu_interfaces.assert_not_called() 

5514 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5515 mock_prepare_vdu_cloud_init.assert_called_once() 

5516 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5517 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called() 

5518 mock_prepare_vdu_interfaces.assert_called_once_with( 

5519 target_vdu, 

5520 { 

5521 "depends_on": [ 

5522 f"{ns_preffix}:image.0", 

5523 f"{ns_preffix}:flavor.0", 

5524 ] 

5525 }, 

5526 ns_preffix, 

5527 vnf_preffix, 

5528 "logger", 

5529 {}, 

5530 [], 

5531 ) 

5532 

5533 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5534 mock_prepare_vdu_affinity_group_list.assert_not_called() 

5535 mock_find_persistent_volumes.assert_not_called() 

5536 mock_find_persistent_root_volumes.assert_not_called() 

5537 

5538 def test_select_persistent_root_disk(self): 

5539 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5540 vdu["virtual-storage-desc"] = [ 

5541 "persistent-root-volume", 

5542 "persistent-volume2", 

5543 "ephemeral-volume", 

5544 ] 

5545 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5546 expected_result = vsd 

5547 result = Ns._select_persistent_root_disk(vsd, vdu) 

5548 self.assertEqual(result, expected_result) 

5549 

5550 def test_select_persistent_root_disk_first_vsd_is_different(self): 

5551 """VDU first virtual-storage-desc is different than vsd id.""" 

5552 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5553 vdu["virtual-storage-desc"] = [ 

5554 "persistent-volume2", 

5555 "persistent-root-volume", 

5556 "ephemeral-volume", 

5557 ] 

5558 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5559 expected_result = None 

5560 result = Ns._select_persistent_root_disk(vsd, vdu) 

5561 self.assertEqual(result, expected_result) 

5562 

5563 def test_select_persistent_root_disk_vsd_is_not_persistent(self): 

5564 """vsd type is not persistent.""" 

5565 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5566 vdu["virtual-storage-desc"] = [ 

5567 "persistent-volume2", 

5568 "persistent-root-volume", 

5569 "ephemeral-volume", 

5570 ] 

5571 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5572 vsd["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage" 

5573 expected_result = None 

5574 result = Ns._select_persistent_root_disk(vsd, vdu) 

5575 self.assertEqual(result, expected_result) 

5576 

5577 def test_select_persistent_root_disk_vsd_does_not_have_size(self): 

5578 """vsd size is None.""" 

5579 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5580 vdu["virtual-storage-desc"] = [ 

5581 "persistent-volume2", 

5582 "persistent-root-volume", 

5583 "ephemeral-volume", 

5584 ] 

5585 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5586 vsd["size-of-storage"] = None 

5587 expected_result = None 

5588 result = Ns._select_persistent_root_disk(vsd, vdu) 

5589 self.assertEqual(result, expected_result) 

5590 

5591 def test_select_persistent_root_disk_vdu_wthout_vsd(self): 

5592 """VDU does not have virtual-storage-desc.""" 

5593 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5594 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5595 expected_result = None 

5596 result = Ns._select_persistent_root_disk(vsd, vdu) 

5597 self.assertEqual(result, expected_result) 

5598 

5599 def test_select_persistent_root_disk_invalid_vsd_type(self): 

5600 """vsd is list, expected to be a dict.""" 

5601 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5602 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"] 

5603 with self.assertRaises(AttributeError): 

5604 Ns._select_persistent_root_disk(vsd, vdu) 

5605 

5606 

5607class TestSFC(unittest.TestCase): 

5608 def setUp(self): 

5609 self.ns = Ns() 

5610 self.logger = CopyingMock(autospec=True) 

5611 

5612 @patch("osm_ng_ro.ns.Ns._prefix_ip_address") 

5613 @patch("osm_ng_ro.ns.Ns._process_ip_proto") 

5614 @patch("osm_ng_ro.ns.Ns._get_vnfr_vdur_text") 

5615 def test_process_classification_params( 

5616 self, mock_get_vnfr_vdur_text, mock_process_ip_proto, mock_prefix_ip_address 

5617 ): 

5618 db = Mock() 

5619 mock_prefix_ip_address.side_effect = ["10.10.10.10/32", "20.20.20.20/32"] 

5620 mock_process_ip_proto.return_value = "tcp" 

5621 mock_get_vnfr_vdur_text.return_value = "vdur_text" 

5622 vim_info, indata, target_record_id = {}, {}, "" 

5623 target_classification = { 

5624 "vnfr_id": "1234", 

5625 "source-ip-address": "10.10.10.10", 

5626 "destination-ip-address": "20.20.20.20", 

5627 "ip-proto": "6", 

5628 "id": "rule1", 

5629 "source-port": "0", 

5630 "destination-port": 5555, 

5631 "vdur_id": "5678", 

5632 "ingress_port_index": 0, 

5633 "vim_info": vim_info, 

5634 } 

5635 kwargs = {"db": db} 

5636 

5637 expected_result = { 

5638 "depends_on": ["vdur_text"], 

5639 "params": { 

5640 "destination_ip_prefix": "20.20.20.20/32", 

5641 "destination_port_range_max": 5555, 

5642 "destination_port_range_min": 5555, 

5643 "logical_source_port": "TASK-vdur_text", 

5644 "logical_source_port_index": 0, 

5645 "name": "rule1", 

5646 "protocol": "tcp", 

5647 "source_ip_prefix": "10.10.10.10/32", 

5648 "source_port_range_max": "0", 

5649 "source_port_range_min": "0", 

5650 }, 

5651 } 

5652 

5653 result = self.ns._process_classification_params( 

5654 target_classification, indata, vim_info, target_record_id, **kwargs 

5655 ) 

5656 self.assertEqual(expected_result, result) 

5657 

5658 def test_process_sfp_params(self): 

5659 sf_text = "nsrs:1234:sf.sf1" 

5660 classi_text = "nsrs:1234:classification.rule1" 

5661 vim_info, indata, target_record_id = {}, {}, "" 

5662 target_sfp = { 

5663 "id": "sfp1", 

5664 "sfs": ["sf1"], 

5665 "classifications": ["rule1"], 

5666 "vim_info": vim_info, 

5667 } 

5668 

5669 kwargs = {"nsr_id": "1234"} 

5670 

5671 expected_result = { 

5672 "depends_on": [sf_text, classi_text], 

5673 "params": { 

5674 "name": "sfp1", 

5675 "sfs": ["TASK-" + sf_text], 

5676 "classifications": ["TASK-" + classi_text], 

5677 }, 

5678 } 

5679 

5680 result = self.ns._process_sfp_params( 

5681 target_sfp, indata, vim_info, target_record_id, **kwargs 

5682 ) 

5683 self.assertEqual(expected_result, result) 

5684 

5685 def test_process_sf_params(self): 

5686 sfi_text = "nsrs::sfi.sfi1" 

5687 vim_info, indata, target_record_id = {}, {}, "" 

5688 target_sf = {"id": "sf1", "sfis": ["sfi1"], "vim_info": vim_info} 

5689 

5690 kwargs = {"ns_id": "1234"} 

5691 

5692 expected_result = { 

5693 "depends_on": [sfi_text], 

5694 "params": { 

5695 "name": "sf1", 

5696 "sfis": ["TASK-" + sfi_text], 

5697 }, 

5698 } 

5699 

5700 result = self.ns._process_sf_params( 

5701 target_sf, indata, vim_info, target_record_id, **kwargs 

5702 ) 

5703 self.assertEqual(expected_result, result) 

5704 

5705 @patch("osm_ng_ro.ns.Ns._get_vnfr_vdur_text") 

5706 def test_process_sfi_params(self, mock_get_vnfr_vdur_text): 

5707 db = Mock() 

5708 mock_get_vnfr_vdur_text.return_value = "vdur_text" 

5709 vim_info, indata, target_record_id = {}, {}, "" 

5710 target_sfi = { 

5711 "id": "sfi1", 

5712 "ingress_port": "vnf-cp0-ext", 

5713 "egress_port": "vnf-cp0-ext", 

5714 "vnfr_id": "1234", 

5715 "vdur_id": "5678", 

5716 "ingress_port_index": 0, 

5717 "egress_port_index": 0, 

5718 "vim_info": {}, 

5719 } 

5720 kwargs = {"db": db} 

5721 

5722 expected_result = { 

5723 "depends_on": ["vdur_text"], 

5724 "params": { 

5725 "name": "sfi1", 

5726 "ingress_port": "TASK-vdur_text", 

5727 "egress_port": "TASK-vdur_text", 

5728 "ingress_port_index": 0, 

5729 "egress_port_index": 0, 

5730 }, 

5731 } 

5732 

5733 result = self.ns._process_sfi_params( 

5734 target_sfi, indata, vim_info, target_record_id, **kwargs 

5735 ) 

5736 self.assertEqual(expected_result, result) 

5737 

5738 def test_process_vnfgd_sfp(self): 

5739 sfp = { 

5740 "id": "sfp1", 

5741 "position-desc-id": [ 

5742 { 

5743 "id": "position1", 

5744 "cp-profile-id": [{"id": "sf1"}], 

5745 "match-attributes": [{"id": "rule1"}], 

5746 } 

5747 ], 

5748 } 

5749 expected_result = {"id": "sfp1", "sfs": ["sf1"], "classifications": ["rule1"]} 

5750 

5751 result = self.ns._process_vnfgd_sfp(sfp) 

5752 self.assertEqual(expected_result, result) 

5753 

5754 def test_process_vnfgd_sf(self): 

5755 sf = {"id": "sf1", "constituent-profile-elements": [{"id": "sfi1", "order": 0}]} 

5756 expected_result = {"id": "sf1", "sfis": ["sfi1"]} 

5757 

5758 result = self.ns._process_vnfgd_sf(sf) 

5759 self.assertEqual(expected_result, result) 

5760 

5761 def test_process_vnfgd_sfi(self): 

5762 sfi = { 

5763 "id": "sfi1", 

5764 "constituent-base-element-id": "vnf", 

5765 "order": 0, 

5766 "ingress-constituent-cpd-id": "vnf-cp0-ext", 

5767 "egress-constituent-cpd-id": "vnf-cp0-ext", 

5768 } 

5769 db_vnfrs = { 

5770 "1234": { 

5771 "id": "1234", 

5772 "member-vnf-index-ref": "vnf", 

5773 "connection-point": [ 

5774 { 

5775 "name": "vnf-cp0-ext", 

5776 "connection-point-id": "vdu-eth0-int", 

5777 "connection-point-vdu-id": "5678", 

5778 "id": "vnf-cp0-ext", 

5779 } 

5780 ], 

5781 } 

5782 } 

5783 expected_result = { 

5784 "id": "sfi1", 

5785 "ingress_port": "vnf-cp0-ext", 

5786 "egress_port": "vnf-cp0-ext", 

5787 "vnfr_id": "1234", 

5788 "vdur_id": "5678", 

5789 "ingress_port_index": 0, 

5790 "egress_port_index": 0, 

5791 } 

5792 

5793 result = self.ns._process_vnfgd_sfi(sfi, db_vnfrs) 

5794 self.assertEqual(expected_result, result) 

5795 

5796 def test_process_vnfgd_classification(self): 

5797 classification = { 

5798 "id": "rule1", 

5799 "ip-proto": 6, 

5800 "source-ip-address": "10.10.10.10", 

5801 "destination-ip-address": "20.20.20.20", 

5802 "constituent-base-element-id": "vnf", 

5803 "constituent-cpd-id": "vnf-cp0-ext", 

5804 "destination-port": 5555, 

5805 } 

5806 db_vnfrs = { 

5807 "1234": { 

5808 "id": "1234", 

5809 "member-vnf-index-ref": "vnf", 

5810 "connection-point": [ 

5811 { 

5812 "name": "vnf-cp0-ext", 

5813 "connection-point-id": "vdu-eth0-int", 

5814 "connection-point-vdu-id": "5678", 

5815 "id": "vnf-cp0-ext", 

5816 } 

5817 ], 

5818 } 

5819 } 

5820 

5821 expected_result = { 

5822 "id": "rule1", 

5823 "ip-proto": 6, 

5824 "source-ip-address": "10.10.10.10", 

5825 "destination-ip-address": "20.20.20.20", 

5826 "destination-port": 5555, 

5827 "vnfr_id": "1234", 

5828 "vdur_id": "5678", 

5829 "ingress_port_index": 0, 

5830 "constituent-base-element-id": "vnf", 

5831 "constituent-cpd-id": "vnf-cp0-ext", 

5832 } 

5833 

5834 result = self.ns._process_vnfgd_classification(classification, db_vnfrs) 

5835 self.assertEqual(expected_result, result)