Coverage for NG-RO/osm_ng_ro/tests/test_ns.py: 99%

1909 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-07-02 09:11 +0000

1####################################################################################### 

2# Copyright ETSI Contributors and Others. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16####################################################################################### 

17from copy import deepcopy 

18import unittest 

19from unittest.mock import MagicMock, Mock, patch 

20 

21from jinja2 import ( 

22 Environment, 

23 select_autoescape, 

24 StrictUndefined, 

25 TemplateError, 

26 TemplateNotFound, 

27 UndefinedError, 

28) 

29from osm_ng_ro.ns import Ns, NsException 

30 

31 

32__author__ = "Eduardo Sousa" 

33__date__ = "$19-NOV-2021 00:00:00$" 

34 

35 

36# Variables used in Tests 

37vnfd_wth_persistent_storage = { 

38 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

39 "df": [ 

40 { 

41 "id": "default-df", 

42 "vdu-profile": [{"id": "several_volumes-VM", "min-number-of-instances": 1}], 

43 } 

44 ], 

45 "id": "several_volumes-vnf", 

46 "product-name": "several_volumes-vnf", 

47 "vdu": [ 

48 { 

49 "id": "several_volumes-VM", 

50 "name": "several_volumes-VM", 

51 "sw-image-desc": "ubuntu20.04", 

52 "alternative-sw-image-desc": [ 

53 "ubuntu20.04-aws", 

54 "ubuntu20.04-azure", 

55 ], 

56 "virtual-storage-desc": [ 

57 "persistent-root-volume", 

58 "persistent-volume2", 

59 "ephemeral-volume", 

60 ], 

61 } 

62 ], 

63 "version": "1.0", 

64 "virtual-storage-desc": [ 

65 { 

66 "id": "persistent-volume2", 

67 "type-of-storage": "persistent-storage:persistent-storage", 

68 "size-of-storage": "10", 

69 }, 

70 { 

71 "id": "persistent-root-volume", 

72 "type-of-storage": "persistent-storage:persistent-storage", 

73 "size-of-storage": "10", 

74 "vdu-storage-requirements": [ 

75 {"key": "keep-volume", "value": "true"}, 

76 ], 

77 }, 

78 { 

79 "id": "ephemeral-volume", 

80 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

81 "size-of-storage": "1", 

82 }, 

83 ], 

84 "_admin": { 

85 "storage": { 

86 "fs": "mongo", 

87 "path": "/app/storage/", 

88 }, 

89 "type": "vnfd", 

90 }, 

91} 

92vim_volume_id = "ru937f49-3870-4169-b758-9732e1ff40f3" 

93task_by_target_record_id = { 

94 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": { 

95 "extra_dict": {"params": {"net_type": "SR-IOV"}} 

96 } 

97} 

98interfaces_wthout_positions = [ 

99 { 

100 "name": "vdu-eth1", 

101 "ns-vld-id": "net1", 

102 }, 

103 { 

104 "name": "vdu-eth2", 

105 "ns-vld-id": "net2", 

106 }, 

107 { 

108 "name": "vdu-eth3", 

109 "ns-vld-id": "mgmtnet", 

110 }, 

111] 

112interfaces_wth_all_positions = [ 

113 { 

114 "name": "vdu-eth1", 

115 "ns-vld-id": "net1", 

116 "position": 2, 

117 }, 

118 { 

119 "name": "vdu-eth2", 

120 "ns-vld-id": "net2", 

121 "position": 0, 

122 }, 

123 { 

124 "name": "vdu-eth3", 

125 "ns-vld-id": "mgmtnet", 

126 "position": 1, 

127 }, 

128] 

129target_vdu_wth_persistent_storage = { 

130 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960", 

131 "ns-flavor-id": "0", 

132 "ns-image-id": "0", 

133 "vdu-name": "several_volumes-VM", 

134 "interfaces": [ 

135 { 

136 "name": "vdu-eth0", 

137 "ns-vld-id": "mgmtnet", 

138 } 

139 ], 

140 "virtual-storages": [ 

141 { 

142 "id": "persistent-volume2", 

143 "size-of-storage": "10", 

144 "type-of-storage": "persistent-storage:persistent-storage", 

145 }, 

146 { 

147 "id": "persistent-root-volume", 

148 "size-of-storage": "10", 

149 "type-of-storage": "persistent-storage:persistent-storage", 

150 "vdu-storage-requirements": [ 

151 {"key": "keep-volume", "value": "true"}, 

152 ], 

153 }, 

154 { 

155 "id": "ephemeral-volume", 

156 "size-of-storage": "1", 

157 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

158 }, 

159 ], 

160} 

161db = MagicMock(name="database mock") 

162fs = MagicMock(name="database mock") 

163ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

164vnf_preffix = "vnfrs:wh47f48-y870-4169-b758-5732e1ff40f5" 

165vnfr_id = "wh47f48-y870-4169-b758-5732e1ff40f5" 

166nsr_id = "th47f48-9870-4169-b758-9732e1ff40f3" 

167indata = { 

168 "name": "sample_name", 

169} 

170expected_extra_dict = { 

171 "depends_on": [ 

172 f"{ns_preffix}:image.0", 

173 f"{ns_preffix}:flavor.0", 

174 ], 

175 "params": { 

176 "affinity_group_list": [], 

177 "availability_zone_index": None, 

178 "availability_zone_list": None, 

179 "cloud_config": None, 

180 "description": "several_volumes-VM", 

181 "disk_list": [], 

182 "flavor_id": f"TASK-{ns_preffix}:flavor.0", 

183 "image_id": f"TASK-{ns_preffix}:image.0", 

184 "name": "sample_name-vnf-several-volumes-several_volumes-VM-0", 

185 "net_list": [], 

186 "start": True, 

187 }, 

188} 

189 

190expected_extra_dict2 = { 

191 "depends_on": [ 

192 f"{ns_preffix}:image.0", 

193 f"{ns_preffix}:flavor.0", 

194 ], 

195 "params": { 

196 "affinity_group_list": [], 

197 "availability_zone_index": None, 

198 "availability_zone_list": None, 

199 "cloud_config": None, 

200 "description": "without_volumes-VM", 

201 "disk_list": [], 

202 "flavor_id": f"TASK-{ns_preffix}:flavor.0", 

203 "image_id": f"TASK-{ns_preffix}:image.0", 

204 "name": "sample_name-vnf-several-volumes-without_volumes-VM-0", 

205 "net_list": [], 

206 "start": True, 

207 }, 

208} 

209 

210tasks_by_target_record_id = { 

211 "nsrs:th47f48-9870-4169-b758-9732e1ff40f3": { 

212 "extra_dict": { 

213 "params": { 

214 "net_type": "SR-IOV", 

215 } 

216 } 

217 } 

218} 

219kwargs = { 

220 "db": MagicMock(), 

221 "vdu2cloud_init": {}, 

222 "vnfr": { 

223 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

224 "member-vnf-index-ref": "vnf-several-volumes", 

225 }, 

226} 

227vnfd_wthout_persistent_storage = { 

228 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

229 "df": [ 

230 { 

231 "id": "default-df", 

232 "vdu-profile": [{"id": "without_volumes-VM", "min-number-of-instances": 1}], 

233 } 

234 ], 

235 "id": "without_volumes-vnf", 

236 "product-name": "without_volumes-vnf", 

237 "vdu": [ 

238 { 

239 "id": "without_volumes-VM", 

240 "name": "without_volumes-VM", 

241 "sw-image-desc": "ubuntu20.04", 

242 "alternative-sw-image-desc": [ 

243 "ubuntu20.04-aws", 

244 "ubuntu20.04-azure", 

245 ], 

246 "virtual-storage-desc": ["root-volume", "ephemeral-volume"], 

247 } 

248 ], 

249 "version": "1.0", 

250 "virtual-storage-desc": [ 

251 {"id": "root-volume", "size-of-storage": "10"}, 

252 { 

253 "id": "ephemeral-volume", 

254 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

255 "size-of-storage": "1", 

256 }, 

257 ], 

258 "_admin": { 

259 "storage": { 

260 "fs": "mongo", 

261 "path": "/app/storage/", 

262 }, 

263 "type": "vnfd", 

264 }, 

265} 

266 

267target_vdu_wthout_persistent_storage = { 

268 "_id": "09a0baa7-b7cb-4924-bd63-9f04a1c23960", 

269 "ns-flavor-id": "0", 

270 "ns-image-id": "0", 

271 "vdu-name": "without_volumes-VM", 

272 "interfaces": [ 

273 { 

274 "name": "vdu-eth0", 

275 "ns-vld-id": "mgmtnet", 

276 } 

277 ], 

278 "virtual-storages": [ 

279 { 

280 "id": "root-volume", 

281 "size-of-storage": "10", 

282 }, 

283 { 

284 "id": "ephemeral-volume", 

285 "size-of-storage": "1", 

286 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

287 }, 

288 ], 

289} 

290cloud_init_content = """ 

291disk_setup: 

292 ephemeral0: 

293 table_type: {{type}} 

294 layout: True 

295 overwrite: {{is_override}} 

296runcmd: 

297 - [ ls, -l, / ] 

298 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

299""" 

300 

301user_data = """ 

302disk_setup: 

303 ephemeral0: 

304 table_type: mbr 

305 layout: True 

306 overwrite: False 

307runcmd: 

308 - [ ls, -l, / ] 

309 - [ sh, -xc, "echo $(date) '& rm -rf /'" ] 

310""" 

311vdu_id = "bb9c43f9-10a2-4569-a8a8-957c3528b6d1" 

312vnf_id = "665b4165-ce24-4320-bf19-b9a45bade49f" 

313target_vim = "vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

314action_id = "bb937f49-3870-4169-b758-9732e1ff40f3" 

315nsr_id_2 = "993166fe-723e-4680-ac4b-b1af2541ae31" 

316target_record_1 = "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

317target_record_id = ( 

318 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:" 

319 "vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1" 

320) 

321expected_result_vertical_scale = { 

322 "target_id": target_vim, 

323 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

324 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

325 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1", 

326 "status": "SCHEDULED", 

327 "action": "EXEC", 

328 "item": "verticalscale", 

329 "target_record": target_record_1, 

330 "target_record_id": target_record_id, 

331 "params": { 

332 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

333 "flavor_dict": "flavor_dict", 

334 "flavor_id": "TASK-nsrs:993166fe-723e-4680-ac4b-b1af2541ae31:flavor.0", 

335 }, 

336 "depends_on": ["nsrs:993166fe-723e-4680-ac4b-b1af2541ae31:flavor.0"], 

337} 

338vdu = { 

339 "id": vdu_id, 

340 "vim_info": {target_vim: {"interfaces": []}}, 

341 "ns-flavor-id": "0", 

342} 

343vnf = {"_id": vnf_id} 

344extra_dict_vertical_scale = { 

345 "params": { 

346 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

347 "flavor_dict": "flavor_dict", 

348 }, 

349} 

350extra_dict_migrate = { 

351 "params": { 

352 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

353 "migrate_host": "migrateToHost", 

354 }, 

355} 

356expected_result_migrate = { 

357 "target_id": target_vim, 

358 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

359 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

360 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1", 

361 "status": "SCHEDULED", 

362 "action": "EXEC", 

363 "item": "migrate", 

364 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35", 

365 "target_record_id": target_record_id, 

366 "params": { 

367 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

368 "migrate_host": "migrateToHost", 

369 }, 

370} 

371expected_result_rebuild_start_stop = { 

372 "target_id": target_vim, 

373 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

374 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

375 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:0", 

376 "status": "SCHEDULED", 

377 "action": "EXEC", 

378 "item": "update", 

379 "target_record_id": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1", 

380} 

381 

382 

383class TestException(Exception): 

384 pass 

385 

386 

387class CopyingMock(MagicMock): 

388 def __call__(self, *args, **kwargs): 

389 args = deepcopy(args) 

390 kwargs = deepcopy(kwargs) 

391 return super(CopyingMock, self).__call__(*args, **kwargs) 

392 

393 

394class TestNs(unittest.TestCase): 

395 def setUp(self): 

396 pass 

397 

398 def test__create_task_without_extra_dict(self): 

399 expected_result = { 

400 "target_id": "vim_openstack_1", 

401 "action_id": "123456", 

402 "nsr_id": "654321", 

403 "task_id": "123456:1", 

404 "status": "SCHEDULED", 

405 "action": "CREATE", 

406 "item": "test_item", 

407 "target_record": "test_target_record", 

408 "target_record_id": "test_target_record_id", 

409 } 

410 deployment_info = { 

411 "action_id": "123456", 

412 "nsr_id": "654321", 

413 "task_index": 1, 

414 } 

415 

416 task = Ns._create_task( 

417 deployment_info=deployment_info, 

418 target_id="vim_openstack_1", 

419 item="test_item", 

420 action="CREATE", 

421 target_record="test_target_record", 

422 target_record_id="test_target_record_id", 

423 ) 

424 

425 self.assertEqual(deployment_info.get("task_index"), 2) 

426 self.assertDictEqual(task, expected_result) 

427 

428 def test__create_task(self): 

429 expected_result = { 

430 "target_id": "vim_openstack_1", 

431 "action_id": "123456", 

432 "nsr_id": "654321", 

433 "task_id": "123456:1", 

434 "status": "SCHEDULED", 

435 "action": "CREATE", 

436 "item": "test_item", 

437 "target_record": "test_target_record", 

438 "target_record_id": "test_target_record_id", 

439 # values coming from extra_dict 

440 "params": "test_params", 

441 "find_params": "test_find_params", 

442 "depends_on": "test_depends_on", 

443 } 

444 deployment_info = { 

445 "action_id": "123456", 

446 "nsr_id": "654321", 

447 "task_index": 1, 

448 } 

449 

450 task = Ns._create_task( 

451 deployment_info=deployment_info, 

452 target_id="vim_openstack_1", 

453 item="test_item", 

454 action="CREATE", 

455 target_record="test_target_record", 

456 target_record_id="test_target_record_id", 

457 extra_dict={ 

458 "params": "test_params", 

459 "find_params": "test_find_params", 

460 "depends_on": "test_depends_on", 

461 }, 

462 ) 

463 

464 self.assertEqual(deployment_info.get("task_index"), 2) 

465 self.assertDictEqual(task, expected_result) 

466 

467 @patch("osm_ng_ro.ns.time") 

468 def test__create_ro_task(self, mock_time: Mock): 

469 now = 1637324838.994551 

470 mock_time.return_value = now 

471 task = { 

472 "target_id": "vim_openstack_1", 

473 "action_id": "123456", 

474 "nsr_id": "654321", 

475 "task_id": "123456:1", 

476 "status": "SCHEDULED", 

477 "action": "CREATE", 

478 "item": "test_item", 

479 "target_record": "test_target_record", 

480 "target_record_id": "test_target_record_id", 

481 # values coming from extra_dict 

482 "params": "test_params", 

483 "find_params": "test_find_params", 

484 "depends_on": "test_depends_on", 

485 } 

486 expected_result = { 

487 "_id": "123456:1", 

488 "locked_by": None, 

489 "locked_at": 0.0, 

490 "target_id": "vim_openstack_1", 

491 "vim_info": { 

492 "created": False, 

493 "created_items": None, 

494 "vim_id": None, 

495 "vim_name": None, 

496 "vim_status": None, 

497 "vim_details": None, 

498 "vim_message": None, 

499 "refresh_at": None, 

500 }, 

501 "modified_at": now, 

502 "created_at": now, 

503 "to_check_at": now, 

504 "tasks": [task], 

505 } 

506 

507 ro_task = Ns._create_ro_task( 

508 target_id="vim_openstack_1", 

509 task=task, 

510 ) 

511 

512 self.assertDictEqual(ro_task, expected_result) 

513 

514 def test__process_image_params_with_empty_target_image(self): 

515 expected_result = { 

516 "find_params": {}, 

517 } 

518 target_image = {} 

519 

520 result = Ns._process_image_params( 

521 target_image=target_image, 

522 indata=None, 

523 vim_info=None, 

524 target_record_id=None, 

525 ) 

526 

527 self.assertDictEqual(expected_result, result) 

528 

529 def test__process_image_params_with_wrong_target_image(self): 

530 expected_result = { 

531 "find_params": {}, 

532 } 

533 target_image = { 

534 "no_image": "to_see_here", 

535 } 

536 

537 result = Ns._process_image_params( 

538 target_image=target_image, 

539 indata=None, 

540 vim_info=None, 

541 target_record_id=None, 

542 ) 

543 

544 self.assertDictEqual(expected_result, result) 

545 

546 def test__process_image_params_with_image(self): 

547 expected_result = { 

548 "find_params": { 

549 "filter_dict": { 

550 "name": "cirros", 

551 }, 

552 }, 

553 } 

554 target_image = { 

555 "image": "cirros", 

556 } 

557 

558 result = Ns._process_image_params( 

559 target_image=target_image, 

560 indata=None, 

561 vim_info=None, 

562 target_record_id=None, 

563 ) 

564 

565 self.assertDictEqual(expected_result, result) 

566 

567 def test__process_image_params_with_vim_image_id(self): 

568 expected_result = { 

569 "find_params": { 

570 "filter_dict": { 

571 "id": "123456", 

572 }, 

573 }, 

574 } 

575 target_image = { 

576 "vim_image_id": "123456", 

577 } 

578 

579 result = Ns._process_image_params( 

580 target_image=target_image, 

581 indata=None, 

582 vim_info=None, 

583 target_record_id=None, 

584 ) 

585 

586 self.assertDictEqual(expected_result, result) 

587 

588 def test__process_image_params_with_image_checksum(self): 

589 expected_result = { 

590 "find_params": { 

591 "filter_dict": { 

592 "checksum": "e3fc50a88d0a364313df4b21ef20c29e", 

593 }, 

594 }, 

595 } 

596 target_image = { 

597 "image_checksum": "e3fc50a88d0a364313df4b21ef20c29e", 

598 } 

599 

600 result = Ns._process_image_params( 

601 target_image=target_image, 

602 indata=None, 

603 vim_info=None, 

604 target_record_id=None, 

605 ) 

606 

607 self.assertDictEqual(expected_result, result) 

608 

609 def test__get_resource_allocation_params_with_empty_target_image(self): 

610 expected_result = {} 

611 quota_descriptor = {} 

612 

613 result = Ns._get_resource_allocation_params( 

614 quota_descriptor=quota_descriptor, 

615 ) 

616 

617 self.assertDictEqual(expected_result, result) 

618 

619 def test__get_resource_allocation_params_with_wrong_target_image(self): 

620 expected_result = {} 

621 quota_descriptor = { 

622 "no_quota": "present_here", 

623 } 

624 

625 result = Ns._get_resource_allocation_params( 

626 quota_descriptor=quota_descriptor, 

627 ) 

628 

629 self.assertDictEqual(expected_result, result) 

630 

631 def test__get_resource_allocation_params_with_limit(self): 

632 expected_result = { 

633 "limit": 10, 

634 } 

635 quota_descriptor = { 

636 "limit": "10", 

637 } 

638 

639 result = Ns._get_resource_allocation_params( 

640 quota_descriptor=quota_descriptor, 

641 ) 

642 

643 self.assertDictEqual(expected_result, result) 

644 

645 def test__get_resource_allocation_params_with_reserve(self): 

646 expected_result = { 

647 "reserve": 20, 

648 } 

649 quota_descriptor = { 

650 "reserve": "20", 

651 } 

652 

653 result = Ns._get_resource_allocation_params( 

654 quota_descriptor=quota_descriptor, 

655 ) 

656 

657 self.assertDictEqual(expected_result, result) 

658 

659 def test__get_resource_allocation_params_with_shares(self): 

660 expected_result = { 

661 "shares": 30, 

662 } 

663 quota_descriptor = { 

664 "shares": "30", 

665 } 

666 

667 result = Ns._get_resource_allocation_params( 

668 quota_descriptor=quota_descriptor, 

669 ) 

670 

671 self.assertDictEqual(expected_result, result) 

672 

673 def test__get_resource_allocation_params(self): 

674 expected_result = { 

675 "limit": 10, 

676 "reserve": 20, 

677 "shares": 30, 

678 } 

679 quota_descriptor = { 

680 "limit": "10", 

681 "reserve": "20", 

682 "shares": "30", 

683 } 

684 

685 result = Ns._get_resource_allocation_params( 

686 quota_descriptor=quota_descriptor, 

687 ) 

688 

689 self.assertDictEqual(expected_result, result) 

690 

691 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

692 def test__process_guest_epa_quota_params_with_empty_quota_epa_cpu( 

693 self, 

694 resource_allocation, 

695 ): 

696 expected_result = {} 

697 guest_epa_quota = {} 

698 epa_vcpu_set = True 

699 

700 result = Ns._process_guest_epa_quota_params( 

701 guest_epa_quota=guest_epa_quota, 

702 epa_vcpu_set=epa_vcpu_set, 

703 ) 

704 

705 self.assertDictEqual(expected_result, result) 

706 self.assertFalse(resource_allocation.called) 

707 

708 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

709 def test__process_guest_epa_quota_params_with_empty_quota_false_epa_cpu( 

710 self, 

711 resource_allocation, 

712 ): 

713 expected_result = {} 

714 guest_epa_quota = {} 

715 epa_vcpu_set = False 

716 

717 result = Ns._process_guest_epa_quota_params( 

718 guest_epa_quota=guest_epa_quota, 

719 epa_vcpu_set=epa_vcpu_set, 

720 ) 

721 

722 self.assertDictEqual(expected_result, result) 

723 self.assertFalse(resource_allocation.called) 

724 

725 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

726 def test__process_guest_epa_quota_params_with_wrong_quota_epa_cpu( 

727 self, 

728 resource_allocation, 

729 ): 

730 expected_result = {} 

731 guest_epa_quota = { 

732 "no-quota": "nothing", 

733 } 

734 epa_vcpu_set = True 

735 

736 result = Ns._process_guest_epa_quota_params( 

737 guest_epa_quota=guest_epa_quota, 

738 epa_vcpu_set=epa_vcpu_set, 

739 ) 

740 

741 self.assertDictEqual(expected_result, result) 

742 self.assertFalse(resource_allocation.called) 

743 

744 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

745 def test__process_guest_epa_quota_params_with_wrong_quota_false_epa_cpu( 

746 self, 

747 resource_allocation, 

748 ): 

749 expected_result = {} 

750 guest_epa_quota = { 

751 "no-quota": "nothing", 

752 } 

753 epa_vcpu_set = False 

754 

755 result = Ns._process_guest_epa_quota_params( 

756 guest_epa_quota=guest_epa_quota, 

757 epa_vcpu_set=epa_vcpu_set, 

758 ) 

759 

760 self.assertDictEqual(expected_result, result) 

761 self.assertFalse(resource_allocation.called) 

762 

763 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

764 def test__process_guest_epa_quota_params_with_cpu_quota_epa_cpu( 

765 self, 

766 resource_allocation, 

767 ): 

768 expected_result = {} 

769 guest_epa_quota = { 

770 "cpu-quota": { 

771 "limit": "10", 

772 "reserve": "20", 

773 "shares": "30", 

774 }, 

775 } 

776 epa_vcpu_set = True 

777 

778 result = Ns._process_guest_epa_quota_params( 

779 guest_epa_quota=guest_epa_quota, 

780 epa_vcpu_set=epa_vcpu_set, 

781 ) 

782 

783 self.assertDictEqual(expected_result, result) 

784 self.assertFalse(resource_allocation.called) 

785 

786 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

787 def test__process_guest_epa_quota_params_with_cpu_quota_false_epa_cpu( 

788 self, 

789 resource_allocation, 

790 ): 

791 expected_result = { 

792 "cpu-quota": { 

793 "limit": 10, 

794 "reserve": 20, 

795 "shares": 30, 

796 }, 

797 } 

798 guest_epa_quota = { 

799 "cpu-quota": { 

800 "limit": "10", 

801 "reserve": "20", 

802 "shares": "30", 

803 }, 

804 } 

805 epa_vcpu_set = False 

806 

807 resource_allocation_param = { 

808 "limit": "10", 

809 "reserve": "20", 

810 "shares": "30", 

811 } 

812 resource_allocation.return_value = { 

813 "limit": 10, 

814 "reserve": 20, 

815 "shares": 30, 

816 } 

817 

818 result = Ns._process_guest_epa_quota_params( 

819 guest_epa_quota=guest_epa_quota, 

820 epa_vcpu_set=epa_vcpu_set, 

821 ) 

822 

823 resource_allocation.assert_called_once_with(resource_allocation_param) 

824 self.assertDictEqual(expected_result, result) 

825 

826 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

827 def test__process_guest_epa_quota_params_with_mem_quota_epa_cpu( 

828 self, 

829 resource_allocation, 

830 ): 

831 expected_result = { 

832 "mem-quota": { 

833 "limit": 10, 

834 "reserve": 20, 

835 "shares": 30, 

836 }, 

837 } 

838 guest_epa_quota = { 

839 "mem-quota": { 

840 "limit": "10", 

841 "reserve": "20", 

842 "shares": "30", 

843 }, 

844 } 

845 epa_vcpu_set = True 

846 

847 resource_allocation_param = { 

848 "limit": "10", 

849 "reserve": "20", 

850 "shares": "30", 

851 } 

852 resource_allocation.return_value = { 

853 "limit": 10, 

854 "reserve": 20, 

855 "shares": 30, 

856 } 

857 

858 result = Ns._process_guest_epa_quota_params( 

859 guest_epa_quota=guest_epa_quota, 

860 epa_vcpu_set=epa_vcpu_set, 

861 ) 

862 

863 resource_allocation.assert_called_once_with(resource_allocation_param) 

864 self.assertDictEqual(expected_result, result) 

865 

866 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

867 def test__process_guest_epa_quota_params_with_mem_quota_false_epa_cpu( 

868 self, 

869 resource_allocation, 

870 ): 

871 expected_result = { 

872 "mem-quota": { 

873 "limit": 10, 

874 "reserve": 20, 

875 "shares": 30, 

876 }, 

877 } 

878 guest_epa_quota = { 

879 "mem-quota": { 

880 "limit": "10", 

881 "reserve": "20", 

882 "shares": "30", 

883 }, 

884 } 

885 epa_vcpu_set = False 

886 

887 resource_allocation_param = { 

888 "limit": "10", 

889 "reserve": "20", 

890 "shares": "30", 

891 } 

892 resource_allocation.return_value = { 

893 "limit": 10, 

894 "reserve": 20, 

895 "shares": 30, 

896 } 

897 

898 result = Ns._process_guest_epa_quota_params( 

899 guest_epa_quota=guest_epa_quota, 

900 epa_vcpu_set=epa_vcpu_set, 

901 ) 

902 

903 resource_allocation.assert_called_once_with(resource_allocation_param) 

904 self.assertDictEqual(expected_result, result) 

905 

906 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

907 def test__process_guest_epa_quota_params_with_disk_io_quota_epa_cpu( 

908 self, 

909 resource_allocation, 

910 ): 

911 expected_result = { 

912 "disk-io-quota": { 

913 "limit": 10, 

914 "reserve": 20, 

915 "shares": 30, 

916 }, 

917 } 

918 guest_epa_quota = { 

919 "disk-io-quota": { 

920 "limit": "10", 

921 "reserve": "20", 

922 "shares": "30", 

923 }, 

924 } 

925 epa_vcpu_set = True 

926 

927 resource_allocation_param = { 

928 "limit": "10", 

929 "reserve": "20", 

930 "shares": "30", 

931 } 

932 resource_allocation.return_value = { 

933 "limit": 10, 

934 "reserve": 20, 

935 "shares": 30, 

936 } 

937 

938 result = Ns._process_guest_epa_quota_params( 

939 guest_epa_quota=guest_epa_quota, 

940 epa_vcpu_set=epa_vcpu_set, 

941 ) 

942 

943 resource_allocation.assert_called_once_with(resource_allocation_param) 

944 self.assertDictEqual(expected_result, result) 

945 

946 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

947 def test__process_guest_epa_quota_params_with_disk_io_quota_false_epa_cpu( 

948 self, 

949 resource_allocation, 

950 ): 

951 expected_result = { 

952 "disk-io-quota": { 

953 "limit": 10, 

954 "reserve": 20, 

955 "shares": 30, 

956 }, 

957 } 

958 guest_epa_quota = { 

959 "disk-io-quota": { 

960 "limit": "10", 

961 "reserve": "20", 

962 "shares": "30", 

963 }, 

964 } 

965 epa_vcpu_set = False 

966 

967 resource_allocation_param = { 

968 "limit": "10", 

969 "reserve": "20", 

970 "shares": "30", 

971 } 

972 resource_allocation.return_value = { 

973 "limit": 10, 

974 "reserve": 20, 

975 "shares": 30, 

976 } 

977 

978 result = Ns._process_guest_epa_quota_params( 

979 guest_epa_quota=guest_epa_quota, 

980 epa_vcpu_set=epa_vcpu_set, 

981 ) 

982 

983 resource_allocation.assert_called_once_with(resource_allocation_param) 

984 self.assertDictEqual(expected_result, result) 

985 

986 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

987 def test__process_guest_epa_quota_params_with_vif_quota_epa_cpu( 

988 self, 

989 resource_allocation, 

990 ): 

991 expected_result = { 

992 "vif-quota": { 

993 "limit": 10, 

994 "reserve": 20, 

995 "shares": 30, 

996 }, 

997 } 

998 guest_epa_quota = { 

999 "vif-quota": { 

1000 "limit": "10", 

1001 "reserve": "20", 

1002 "shares": "30", 

1003 }, 

1004 } 

1005 epa_vcpu_set = True 

1006 

1007 resource_allocation_param = { 

1008 "limit": "10", 

1009 "reserve": "20", 

1010 "shares": "30", 

1011 } 

1012 resource_allocation.return_value = { 

1013 "limit": 10, 

1014 "reserve": 20, 

1015 "shares": 30, 

1016 } 

1017 

1018 result = Ns._process_guest_epa_quota_params( 

1019 guest_epa_quota=guest_epa_quota, 

1020 epa_vcpu_set=epa_vcpu_set, 

1021 ) 

1022 

1023 resource_allocation.assert_called_once_with(resource_allocation_param) 

1024 self.assertDictEqual(expected_result, result) 

1025 

1026 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

1027 def test__process_guest_epa_quota_params_with_vif_quota_false_epa_cpu( 

1028 self, 

1029 resource_allocation, 

1030 ): 

1031 expected_result = { 

1032 "vif-quota": { 

1033 "limit": 10, 

1034 "reserve": 20, 

1035 "shares": 30, 

1036 }, 

1037 } 

1038 guest_epa_quota = { 

1039 "vif-quota": { 

1040 "limit": "10", 

1041 "reserve": "20", 

1042 "shares": "30", 

1043 }, 

1044 } 

1045 epa_vcpu_set = False 

1046 

1047 resource_allocation_param = { 

1048 "limit": "10", 

1049 "reserve": "20", 

1050 "shares": "30", 

1051 } 

1052 resource_allocation.return_value = { 

1053 "limit": 10, 

1054 "reserve": 20, 

1055 "shares": 30, 

1056 } 

1057 

1058 result = Ns._process_guest_epa_quota_params( 

1059 guest_epa_quota=guest_epa_quota, 

1060 epa_vcpu_set=epa_vcpu_set, 

1061 ) 

1062 

1063 resource_allocation.assert_called_once_with(resource_allocation_param) 

1064 self.assertDictEqual(expected_result, result) 

1065 

1066 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

1067 def test__process_guest_epa_quota_params_with_quota_epa_cpu( 

1068 self, 

1069 resource_allocation, 

1070 ): 

1071 expected_result = { 

1072 "mem-quota": { 

1073 "limit": 10, 

1074 "reserve": 20, 

1075 "shares": 30, 

1076 }, 

1077 "disk-io-quota": { 

1078 "limit": 10, 

1079 "reserve": 20, 

1080 "shares": 30, 

1081 }, 

1082 "vif-quota": { 

1083 "limit": 10, 

1084 "reserve": 20, 

1085 "shares": 30, 

1086 }, 

1087 } 

1088 guest_epa_quota = { 

1089 "cpu-quota": { 

1090 "limit": "10", 

1091 "reserve": "20", 

1092 "shares": "30", 

1093 }, 

1094 "mem-quota": { 

1095 "limit": "10", 

1096 "reserve": "20", 

1097 "shares": "30", 

1098 }, 

1099 "disk-io-quota": { 

1100 "limit": "10", 

1101 "reserve": "20", 

1102 "shares": "30", 

1103 }, 

1104 "vif-quota": { 

1105 "limit": "10", 

1106 "reserve": "20", 

1107 "shares": "30", 

1108 }, 

1109 } 

1110 epa_vcpu_set = True 

1111 

1112 resource_allocation.return_value = { 

1113 "limit": 10, 

1114 "reserve": 20, 

1115 "shares": 30, 

1116 } 

1117 

1118 result = Ns._process_guest_epa_quota_params( 

1119 guest_epa_quota=guest_epa_quota, 

1120 epa_vcpu_set=epa_vcpu_set, 

1121 ) 

1122 

1123 self.assertTrue(resource_allocation.called) 

1124 self.assertDictEqual(expected_result, result) 

1125 

1126 @patch("osm_ng_ro.ns.Ns._get_resource_allocation_params") 

1127 def test__process_guest_epa_quota_params_with_quota_epa_cpu_no_set( 

1128 self, 

1129 resource_allocation, 

1130 ): 

1131 expected_result = { 

1132 "cpu-quota": { 

1133 "limit": 10, 

1134 "reserve": 20, 

1135 "shares": 30, 

1136 }, 

1137 "mem-quota": { 

1138 "limit": 10, 

1139 "reserve": 20, 

1140 "shares": 30, 

1141 }, 

1142 "disk-io-quota": { 

1143 "limit": 10, 

1144 "reserve": 20, 

1145 "shares": 30, 

1146 }, 

1147 "vif-quota": { 

1148 "limit": 10, 

1149 "reserve": 20, 

1150 "shares": 30, 

1151 }, 

1152 } 

1153 guest_epa_quota = { 

1154 "cpu-quota": { 

1155 "limit": "10", 

1156 "reserve": "20", 

1157 "shares": "30", 

1158 }, 

1159 "mem-quota": { 

1160 "limit": "10", 

1161 "reserve": "20", 

1162 "shares": "30", 

1163 }, 

1164 "disk-io-quota": { 

1165 "limit": "10", 

1166 "reserve": "20", 

1167 "shares": "30", 

1168 }, 

1169 "vif-quota": { 

1170 "limit": "10", 

1171 "reserve": "20", 

1172 "shares": "30", 

1173 }, 

1174 } 

1175 epa_vcpu_set = False 

1176 

1177 resource_allocation.return_value = { 

1178 "limit": 10, 

1179 "reserve": 20, 

1180 "shares": 30, 

1181 } 

1182 

1183 result = Ns._process_guest_epa_quota_params( 

1184 guest_epa_quota=guest_epa_quota, 

1185 epa_vcpu_set=epa_vcpu_set, 

1186 ) 

1187 

1188 self.assertTrue(resource_allocation.called) 

1189 self.assertDictEqual(expected_result, result) 

1190 

1191 def test__process_guest_epa_numa_params_with_empty_numa_params(self): 

1192 expected_numa_result = [] 

1193 expected_epa_vcpu_set_result = False 

1194 guest_epa_quota = {} 

1195 

1196 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1197 guest_epa_quota=guest_epa_quota, 

1198 ) 

1199 self.assertEqual(expected_numa_result, numa_result) 

1200 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1201 

1202 def test__process_guest_epa_numa_params_with_wrong_numa_params(self): 

1203 expected_numa_result = [] 

1204 expected_epa_vcpu_set_result = False 

1205 guest_epa_quota = {"no_nume": "here"} 

1206 

1207 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1208 guest_epa_quota=guest_epa_quota, 

1209 ) 

1210 

1211 self.assertEqual(expected_numa_result, numa_result) 

1212 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1213 

1214 def test__process_guest_epa_numa_params_with_numa_node_policy(self): 

1215 expected_numa_result = [] 

1216 expected_epa_vcpu_set_result = False 

1217 guest_epa_quota = {"numa-node-policy": {}} 

1218 

1219 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1220 guest_epa_quota=guest_epa_quota, 

1221 ) 

1222 

1223 self.assertEqual(expected_numa_result, numa_result) 

1224 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1225 

1226 def test__process_guest_epa_numa_params_with_no_node(self): 

1227 expected_numa_result = [] 

1228 expected_epa_vcpu_set_result = False 

1229 guest_epa_quota = { 

1230 "numa-node-policy": { 

1231 "node": [], 

1232 }, 

1233 } 

1234 

1235 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1236 guest_epa_quota=guest_epa_quota, 

1237 ) 

1238 

1239 self.assertEqual(expected_numa_result, numa_result) 

1240 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1241 

1242 def test__process_guest_epa_numa_params_with_1_node_num_cores(self): 

1243 expected_numa_result = [{"cores": 3}] 

1244 expected_epa_vcpu_set_result = True 

1245 guest_epa_quota = { 

1246 "numa-node-policy": { 

1247 "node": [ 

1248 { 

1249 "num-cores": 3, 

1250 }, 

1251 ], 

1252 }, 

1253 } 

1254 

1255 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1256 guest_epa_quota=guest_epa_quota, 

1257 ) 

1258 

1259 self.assertEqual(expected_numa_result, numa_result) 

1260 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1261 

1262 def test__process_guest_epa_numa_params_with_1_node_paired_threads(self): 

1263 expected_numa_result = [{"paired_threads": 3}] 

1264 expected_epa_vcpu_set_result = True 

1265 guest_epa_quota = { 

1266 "numa-node-policy": { 

1267 "node": [ 

1268 { 

1269 "paired-threads": {"num-paired-threads": "3"}, 

1270 }, 

1271 ], 

1272 }, 

1273 } 

1274 

1275 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1276 guest_epa_quota=guest_epa_quota, 

1277 ) 

1278 

1279 self.assertEqual(expected_numa_result, numa_result) 

1280 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1281 

1282 def test__process_guest_epa_numa_params_with_1_node_paired_threads_ids(self): 

1283 expected_numa_result = [ 

1284 { 

1285 "paired-threads-id": [("0", "1"), ("4", "5")], 

1286 } 

1287 ] 

1288 expected_epa_vcpu_set_result = False 

1289 guest_epa_quota = { 

1290 "numa-node-policy": { 

1291 "node": [ 

1292 { 

1293 "paired-threads": { 

1294 "paired-thread-ids": [ 

1295 { 

1296 "thread-a": 0, 

1297 "thread-b": 1, 

1298 }, 

1299 { 

1300 "thread-a": 4, 

1301 "thread-b": 5, 

1302 }, 

1303 ], 

1304 }, 

1305 }, 

1306 ], 

1307 }, 

1308 } 

1309 

1310 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1311 guest_epa_quota=guest_epa_quota, 

1312 ) 

1313 

1314 self.assertEqual(expected_numa_result, numa_result) 

1315 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1316 

1317 def test__process_guest_epa_numa_params_with_1_node_num_threads(self): 

1318 expected_numa_result = [{"threads": 3}] 

1319 expected_epa_vcpu_set_result = True 

1320 guest_epa_quota = { 

1321 "numa-node-policy": { 

1322 "node": [ 

1323 { 

1324 "num-threads": "3", 

1325 }, 

1326 ], 

1327 }, 

1328 } 

1329 

1330 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1331 guest_epa_quota=guest_epa_quota, 

1332 ) 

1333 

1334 self.assertEqual(expected_numa_result, numa_result) 

1335 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1336 

1337 def test__process_guest_epa_numa_params_with_1_node_memory_mb(self): 

1338 expected_numa_result = [{"memory": 2}] 

1339 expected_epa_vcpu_set_result = False 

1340 guest_epa_quota = { 

1341 "numa-node-policy": { 

1342 "node": [ 

1343 { 

1344 "memory-mb": 2048, 

1345 }, 

1346 ], 

1347 }, 

1348 } 

1349 

1350 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1351 guest_epa_quota=guest_epa_quota, 

1352 ) 

1353 

1354 self.assertEqual(expected_numa_result, numa_result) 

1355 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1356 

1357 def test__process_guest_epa_numa_params_with_1_node_vcpu(self): 

1358 expected_numa_result = [ 

1359 { 

1360 "id": 0, 

1361 "vcpu": [0, 1], 

1362 } 

1363 ] 

1364 expected_epa_vcpu_set_result = False 

1365 guest_epa_quota = { 

1366 "numa-node-policy": { 

1367 "node": [{"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}], 

1368 }, 

1369 } 

1370 

1371 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1372 guest_epa_quota=guest_epa_quota, 

1373 ) 

1374 

1375 self.assertEqual(expected_numa_result, numa_result) 

1376 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1377 

1378 def test__process_guest_epa_numa_params_with_2_node_vcpu(self): 

1379 expected_numa_result = [ 

1380 { 

1381 "id": 0, 

1382 "vcpu": [0, 1], 

1383 }, 

1384 { 

1385 "id": 1, 

1386 "vcpu": [2, 3], 

1387 }, 

1388 ] 

1389 

1390 expected_epa_vcpu_set_result = False 

1391 guest_epa_quota = { 

1392 "numa-node-policy": { 

1393 "node": [ 

1394 {"id": "0", "vcpu": [{"id": "0"}, {"id": "1"}]}, 

1395 {"id": "1", "vcpu": [{"id": "2"}, {"id": "3"}]}, 

1396 ], 

1397 }, 

1398 } 

1399 

1400 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1401 guest_epa_quota=guest_epa_quota, 

1402 ) 

1403 

1404 self.assertEqual(expected_numa_result, numa_result) 

1405 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1406 

1407 def test__process_guest_epa_numa_params_with_1_node(self): 

1408 expected_numa_result = [ 

1409 { 

1410 # "id": 0, 

1411 # "vcpu": [0, 1], 

1412 "cores": 3, 

1413 "paired_threads": 3, 

1414 "paired-threads-id": [("0", "1"), ("4", "5")], 

1415 "threads": 3, 

1416 "memory": 2, 

1417 } 

1418 ] 

1419 expected_epa_vcpu_set_result = True 

1420 guest_epa_quota = { 

1421 "numa-node-policy": { 

1422 "node": [ 

1423 { 

1424 "num-cores": 3, 

1425 "paired-threads": { 

1426 "num-paired-threads": "3", 

1427 "paired-thread-ids": [ 

1428 { 

1429 "thread-a": 0, 

1430 "thread-b": 1, 

1431 }, 

1432 { 

1433 "thread-a": 4, 

1434 "thread-b": 5, 

1435 }, 

1436 ], 

1437 }, 

1438 "num-threads": "3", 

1439 "memory-mb": 2048, 

1440 }, 

1441 ], 

1442 }, 

1443 } 

1444 

1445 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1446 guest_epa_quota=guest_epa_quota, 

1447 ) 

1448 

1449 self.assertEqual(expected_numa_result, numa_result) 

1450 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1451 

1452 def test__process_guest_epa_numa_params_with_2_nodes(self): 

1453 expected_numa_result = [ 

1454 { 

1455 "cores": 3, 

1456 "paired_threads": 3, 

1457 "paired-threads-id": [("0", "1"), ("4", "5")], 

1458 "threads": 3, 

1459 "memory": 2, 

1460 }, 

1461 { 

1462 "cores": 7, 

1463 "paired_threads": 7, 

1464 "paired-threads-id": [("2", "3"), ("5", "6")], 

1465 "threads": 4, 

1466 "memory": 4, 

1467 }, 

1468 ] 

1469 expected_epa_vcpu_set_result = True 

1470 guest_epa_quota = { 

1471 "numa-node-policy": { 

1472 "node": [ 

1473 { 

1474 "num-cores": 3, 

1475 "paired-threads": { 

1476 "num-paired-threads": "3", 

1477 "paired-thread-ids": [ 

1478 { 

1479 "thread-a": 0, 

1480 "thread-b": 1, 

1481 }, 

1482 { 

1483 "thread-a": 4, 

1484 "thread-b": 5, 

1485 }, 

1486 ], 

1487 }, 

1488 "num-threads": "3", 

1489 "memory-mb": 2048, 

1490 }, 

1491 { 

1492 "num-cores": 7, 

1493 "paired-threads": { 

1494 "num-paired-threads": "7", 

1495 "paired-thread-ids": [ 

1496 { 

1497 "thread-a": 2, 

1498 "thread-b": 3, 

1499 }, 

1500 { 

1501 "thread-a": 5, 

1502 "thread-b": 6, 

1503 }, 

1504 ], 

1505 }, 

1506 "num-threads": "4", 

1507 "memory-mb": 4096, 

1508 }, 

1509 ], 

1510 }, 

1511 } 

1512 

1513 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_numa_params( 

1514 guest_epa_quota=guest_epa_quota, 

1515 ) 

1516 

1517 self.assertEqual(expected_numa_result, numa_result) 

1518 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1519 

1520 def test__process_guest_epa_cpu_pinning_params_with_empty_params(self): 

1521 expected_numa_result = {} 

1522 expected_epa_vcpu_set_result = False 

1523 guest_epa_quota = {} 

1524 vcpu_count = 0 

1525 epa_vcpu_set = False 

1526 

1527 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1528 guest_epa_quota=guest_epa_quota, 

1529 vcpu_count=vcpu_count, 

1530 epa_vcpu_set=epa_vcpu_set, 

1531 ) 

1532 

1533 self.assertDictEqual(expected_numa_result, numa_result) 

1534 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1535 

1536 def test__process_guest_epa_cpu_pinning_params_with_wrong_params(self): 

1537 expected_numa_result = {} 

1538 expected_epa_vcpu_set_result = False 

1539 guest_epa_quota = { 

1540 "no-cpu-pinning-policy": "here", 

1541 } 

1542 vcpu_count = 0 

1543 epa_vcpu_set = False 

1544 

1545 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1546 guest_epa_quota=guest_epa_quota, 

1547 vcpu_count=vcpu_count, 

1548 epa_vcpu_set=epa_vcpu_set, 

1549 ) 

1550 

1551 self.assertDictEqual(expected_numa_result, numa_result) 

1552 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1553 

1554 def test__process_guest_epa_cpu_pinning_params_with_epa_vcpu_set(self): 

1555 expected_numa_result = {} 

1556 expected_epa_vcpu_set_result = True 

1557 guest_epa_quota = { 

1558 "cpu-pinning-policy": "DEDICATED", 

1559 } 

1560 vcpu_count = 0 

1561 epa_vcpu_set = True 

1562 

1563 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1564 guest_epa_quota=guest_epa_quota, 

1565 vcpu_count=vcpu_count, 

1566 epa_vcpu_set=epa_vcpu_set, 

1567 ) 

1568 

1569 self.assertDictEqual(expected_numa_result, numa_result) 

1570 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1571 

1572 def test__process_guest_epa_cpu_pinning_params_with_policy_prefer(self): 

1573 expected_numa_result = {"threads": 3} 

1574 expected_epa_vcpu_set_result = True 

1575 guest_epa_quota = { 

1576 "cpu-pinning-policy": "DEDICATED", 

1577 "cpu-thread-pinning-policy": "PREFER", 

1578 } 

1579 vcpu_count = 3 

1580 epa_vcpu_set = False 

1581 

1582 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1583 guest_epa_quota=guest_epa_quota, 

1584 vcpu_count=vcpu_count, 

1585 epa_vcpu_set=epa_vcpu_set, 

1586 ) 

1587 

1588 self.assertDictEqual(expected_numa_result, numa_result) 

1589 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1590 

1591 def test__process_guest_epa_cpu_pinning_params_with_policy_isolate(self): 

1592 expected_numa_result = {"cores": 3} 

1593 expected_epa_vcpu_set_result = True 

1594 guest_epa_quota = { 

1595 "cpu-pinning-policy": "DEDICATED", 

1596 "cpu-thread-pinning-policy": "ISOLATE", 

1597 } 

1598 vcpu_count = 3 

1599 epa_vcpu_set = False 

1600 

1601 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1602 guest_epa_quota=guest_epa_quota, 

1603 vcpu_count=vcpu_count, 

1604 epa_vcpu_set=epa_vcpu_set, 

1605 ) 

1606 

1607 self.assertDictEqual(expected_numa_result, numa_result) 

1608 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1609 

1610 def test__process_guest_epa_cpu_pinning_params_with_policy_require(self): 

1611 expected_numa_result = {"threads": 3} 

1612 expected_epa_vcpu_set_result = True 

1613 guest_epa_quota = { 

1614 "cpu-pinning-policy": "DEDICATED", 

1615 "cpu-thread-pinning-policy": "REQUIRE", 

1616 } 

1617 vcpu_count = 3 

1618 epa_vcpu_set = False 

1619 

1620 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1621 guest_epa_quota=guest_epa_quota, 

1622 vcpu_count=vcpu_count, 

1623 epa_vcpu_set=epa_vcpu_set, 

1624 ) 

1625 

1626 self.assertDictEqual(expected_numa_result, numa_result) 

1627 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1628 

1629 def test__process_guest_epa_cpu_pinning_params(self): 

1630 expected_numa_result = {"threads": 3} 

1631 expected_epa_vcpu_set_result = True 

1632 guest_epa_quota = { 

1633 "cpu-pinning-policy": "DEDICATED", 

1634 } 

1635 vcpu_count = 3 

1636 epa_vcpu_set = False 

1637 

1638 numa_result, epa_vcpu_set_result = Ns._process_guest_epa_cpu_pinning_params( 

1639 guest_epa_quota=guest_epa_quota, 

1640 vcpu_count=vcpu_count, 

1641 epa_vcpu_set=epa_vcpu_set, 

1642 ) 

1643 

1644 self.assertDictEqual(expected_numa_result, numa_result) 

1645 self.assertEqual(expected_epa_vcpu_set_result, epa_vcpu_set_result) 

1646 

1647 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1648 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1649 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1650 def test__process_guest_epa_params_with_empty_params( 

1651 self, 

1652 guest_epa_numa_params, 

1653 guest_epa_cpu_pinning_params, 

1654 guest_epa_quota_params, 

1655 ): 

1656 expected_result = {} 

1657 target_flavor = {} 

1658 

1659 result = Ns._process_epa_params( 

1660 target_flavor=target_flavor, 

1661 ) 

1662 

1663 self.assertDictEqual(expected_result, result) 

1664 self.assertFalse(guest_epa_numa_params.called) 

1665 self.assertFalse(guest_epa_cpu_pinning_params.called) 

1666 self.assertFalse(guest_epa_quota_params.called) 

1667 

1668 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1669 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1670 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1671 def test__process_guest_epa_params_with_wrong_params( 

1672 self, 

1673 guest_epa_numa_params, 

1674 guest_epa_cpu_pinning_params, 

1675 guest_epa_quota_params, 

1676 ): 

1677 expected_result = {} 

1678 target_flavor = { 

1679 "no-guest-epa": "here", 

1680 } 

1681 

1682 result = Ns._process_epa_params( 

1683 target_flavor=target_flavor, 

1684 ) 

1685 

1686 self.assertDictEqual(expected_result, result) 

1687 self.assertFalse(guest_epa_numa_params.called) 

1688 self.assertFalse(guest_epa_cpu_pinning_params.called) 

1689 self.assertFalse(guest_epa_quota_params.called) 

1690 

1691 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1692 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1693 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1694 def test__process_guest_epa_params( 

1695 self, 

1696 guest_epa_numa_params, 

1697 guest_epa_cpu_pinning_params, 

1698 guest_epa_quota_params, 

1699 ): 

1700 expected_result = { 

1701 "mem-policy": "STRICT", 

1702 } 

1703 target_flavor = { 

1704 "guest-epa": { 

1705 "vcpu-count": 1, 

1706 "numa-node-policy": { 

1707 "mem-policy": "STRICT", 

1708 }, 

1709 }, 

1710 } 

1711 

1712 guest_epa_numa_params.return_value = ({}, False) 

1713 guest_epa_cpu_pinning_params.return_value = ({}, False) 

1714 guest_epa_quota_params.return_value = {} 

1715 

1716 result = Ns._process_epa_params( 

1717 target_flavor=target_flavor, 

1718 ) 

1719 

1720 self.assertDictEqual(expected_result, result) 

1721 self.assertTrue(guest_epa_numa_params.called) 

1722 self.assertTrue(guest_epa_cpu_pinning_params.called) 

1723 self.assertTrue(guest_epa_quota_params.called) 

1724 

1725 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1726 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1727 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1728 def test__process_guest_epa_params_with_mempage_size( 

1729 self, 

1730 guest_epa_numa_params, 

1731 guest_epa_cpu_pinning_params, 

1732 guest_epa_quota_params, 

1733 ): 

1734 expected_result = { 

1735 "mempage-size": "1G", 

1736 "mem-policy": "STRICT", 

1737 } 

1738 target_flavor = { 

1739 "guest-epa": { 

1740 "vcpu-count": 1, 

1741 "mempage-size": "1G", 

1742 "numa-node-policy": { 

1743 "mem-policy": "STRICT", 

1744 }, 

1745 }, 

1746 } 

1747 

1748 guest_epa_numa_params.return_value = ({}, False) 

1749 guest_epa_cpu_pinning_params.return_value = ({}, False) 

1750 guest_epa_quota_params.return_value = {} 

1751 

1752 result = Ns._process_epa_params( 

1753 target_flavor=target_flavor, 

1754 ) 

1755 

1756 self.assertDictEqual(expected_result, result) 

1757 self.assertTrue(guest_epa_numa_params.called) 

1758 self.assertTrue(guest_epa_cpu_pinning_params.called) 

1759 self.assertTrue(guest_epa_quota_params.called) 

1760 

1761 @patch("osm_ng_ro.ns.Ns._process_guest_epa_quota_params") 

1762 @patch("osm_ng_ro.ns.Ns._process_guest_epa_cpu_pinning_params") 

1763 @patch("osm_ng_ro.ns.Ns._process_guest_epa_numa_params") 

1764 def test__process_guest_epa_params_with_numa( 

1765 self, 

1766 guest_epa_numa_params, 

1767 guest_epa_cpu_pinning_params, 

1768 guest_epa_quota_params, 

1769 ): 

1770 expected_result = { 

1771 "mempage-size": "1G", 

1772 "cpu-pinning-policy": "DEDICATED", 

1773 "cpu-thread-pinning-policy": "PREFER", 

1774 "numas": [ 

1775 { 

1776 "cores": 3, 

1777 "memory": 2, 

1778 "paired-threads": 3, 

1779 "paired-threads-id": [("0", "1"), ("4", "5")], 

1780 "threads": 3, 

1781 } 

1782 ], 

1783 "cpu-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1784 "disk-io-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1785 "mem-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1786 "vif-quota": {"limit": 10, "reserve": 20, "shares": 30}, 

1787 } 

1788 target_flavor = { 

1789 "guest-epa": { 

1790 "vcpu-count": 1, 

1791 "mempage-size": "1G", 

1792 "cpu-pinning-policy": "DEDICATED", 

1793 "cpu-thread-pinning-policy": "PREFER", 

1794 "numa-node-policy": { 

1795 "node": [ 

1796 { 

1797 "num-cores": 3, 

1798 "paired-threads": { 

1799 "num-paired-threads": "3", 

1800 "paired-thread-ids": [ 

1801 { 

1802 "thread-a": 0, 

1803 "thread-b": 1, 

1804 }, 

1805 { 

1806 "thread-a": 4, 

1807 "thread-b": 5, 

1808 }, 

1809 ], 

1810 }, 

1811 "num-threads": "3", 

1812 "memory-mb": 2048, 

1813 }, 

1814 ], 

1815 }, 

1816 "cpu-quota": { 

1817 "limit": "10", 

1818 "reserve": "20", 

1819 "shares": "30", 

1820 }, 

1821 "mem-quota": { 

1822 "limit": "10", 

1823 "reserve": "20", 

1824 "shares": "30", 

1825 }, 

1826 "disk-io-quota": { 

1827 "limit": "10", 

1828 "reserve": "20", 

1829 "shares": "30", 

1830 }, 

1831 "vif-quota": { 

1832 "limit": "10", 

1833 "reserve": "20", 

1834 "shares": "30", 

1835 }, 

1836 }, 

1837 } 

1838 

1839 guest_epa_numa_params.return_value = ( 

1840 [ 

1841 { 

1842 "cores": 3, 

1843 "paired-threads": 3, 

1844 "paired-threads-id": [("0", "1"), ("4", "5")], 

1845 "threads": 3, 

1846 "memory": 2, 

1847 }, 

1848 ], 

1849 True, 

1850 ) 

1851 guest_epa_cpu_pinning_params.return_value = ( 

1852 { 

1853 "threads": 3, 

1854 }, 

1855 True, 

1856 ) 

1857 guest_epa_quota_params.return_value = { 

1858 "cpu-quota": { 

1859 "limit": 10, 

1860 "reserve": 20, 

1861 "shares": 30, 

1862 }, 

1863 "mem-quota": { 

1864 "limit": 10, 

1865 "reserve": 20, 

1866 "shares": 30, 

1867 }, 

1868 "disk-io-quota": { 

1869 "limit": 10, 

1870 "reserve": 20, 

1871 "shares": 30, 

1872 }, 

1873 "vif-quota": { 

1874 "limit": 10, 

1875 "reserve": 20, 

1876 "shares": 30, 

1877 }, 

1878 } 

1879 

1880 result = Ns._process_epa_params( 

1881 target_flavor=target_flavor, 

1882 ) 

1883 self.assertEqual(expected_result, result) 

1884 self.assertTrue(guest_epa_numa_params.called) 

1885 self.assertTrue(guest_epa_cpu_pinning_params.called) 

1886 self.assertTrue(guest_epa_quota_params.called) 

1887 

1888 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1889 def test__process_flavor_params_with_empty_target_flavor( 

1890 self, 

1891 epa_params, 

1892 ): 

1893 target_flavor = {} 

1894 indata = { 

1895 "vnf": [ 

1896 { 

1897 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

1898 }, 

1899 ], 

1900 } 

1901 vim_info = {} 

1902 target_record_id = "" 

1903 

1904 with self.assertRaises(KeyError): 

1905 Ns._process_flavor_params( 

1906 target_flavor=target_flavor, 

1907 indata=indata, 

1908 vim_info=vim_info, 

1909 target_record_id=target_record_id, 

1910 ) 

1911 

1912 self.assertFalse(epa_params.called) 

1913 

1914 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1915 def test__process_flavor_params_with_wrong_target_flavor( 

1916 self, 

1917 epa_params, 

1918 ): 

1919 target_flavor = { 

1920 "no-target-flavor": "here", 

1921 } 

1922 indata = {} 

1923 vim_info = {} 

1924 target_record_id = "" 

1925 

1926 with self.assertRaises(KeyError): 

1927 Ns._process_flavor_params( 

1928 target_flavor=target_flavor, 

1929 indata=indata, 

1930 vim_info=vim_info, 

1931 target_record_id=target_record_id, 

1932 ) 

1933 

1934 self.assertFalse(epa_params.called) 

1935 

1936 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1937 def test__process_flavor_params_with_empty_indata( 

1938 self, 

1939 epa_params, 

1940 ): 

1941 expected_result = { 

1942 "find_params": { 

1943 "flavor_data": { 

1944 "disk": 10, 

1945 "ram": 1024, 

1946 "vcpus": 2, 

1947 }, 

1948 }, 

1949 "params": { 

1950 "flavor_data": { 

1951 "disk": 10, 

1952 "name": "test", 

1953 "ram": 1024, 

1954 "vcpus": 2, 

1955 }, 

1956 }, 

1957 } 

1958 target_flavor = { 

1959 "name": "test", 

1960 "storage-gb": "10", 

1961 "memory-mb": "1024", 

1962 "vcpu-count": "2", 

1963 } 

1964 indata = {} 

1965 vim_info = {} 

1966 target_record_id = "" 

1967 

1968 epa_params.return_value = {} 

1969 

1970 result = Ns._process_flavor_params( 

1971 target_flavor=target_flavor, 

1972 indata=indata, 

1973 vim_info=vim_info, 

1974 target_record_id=target_record_id, 

1975 ) 

1976 

1977 self.assertTrue(epa_params.called) 

1978 self.assertDictEqual(result, expected_result) 

1979 

1980 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

1981 def test__process_flavor_params_with_wrong_indata( 

1982 self, 

1983 epa_params, 

1984 ): 

1985 expected_result = { 

1986 "find_params": { 

1987 "flavor_data": { 

1988 "disk": 10, 

1989 "ram": 1024, 

1990 "vcpus": 2, 

1991 }, 

1992 }, 

1993 "params": { 

1994 "flavor_data": { 

1995 "disk": 10, 

1996 "name": "test", 

1997 "ram": 1024, 

1998 "vcpus": 2, 

1999 }, 

2000 }, 

2001 } 

2002 target_flavor = { 

2003 "name": "test", 

2004 "storage-gb": "10", 

2005 "memory-mb": "1024", 

2006 "vcpu-count": "2", 

2007 } 

2008 indata = { 

2009 "no-vnf": "here", 

2010 } 

2011 vim_info = {} 

2012 target_record_id = "" 

2013 

2014 epa_params.return_value = {} 

2015 

2016 result = Ns._process_flavor_params( 

2017 target_flavor=target_flavor, 

2018 indata=indata, 

2019 vim_info=vim_info, 

2020 target_record_id=target_record_id, 

2021 ) 

2022 

2023 self.assertTrue(epa_params.called) 

2024 self.assertDictEqual(result, expected_result) 

2025 

2026 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2027 def test__process_flavor_params_with_ephemeral_disk( 

2028 self, 

2029 epa_params, 

2030 ): 

2031 kwargs = { 

2032 "db": db, 

2033 } 

2034 

2035 db.get_one.return_value = { 

2036 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2037 "df": [ 

2038 { 

2039 "id": "default-df", 

2040 "vdu-profile": [ 

2041 {"id": "without_volumes-VM", "min-number-of-instances": 1} 

2042 ], 

2043 } 

2044 ], 

2045 "id": "without_volumes-vnf", 

2046 "product-name": "without_volumes-vnf", 

2047 "vdu": [ 

2048 { 

2049 "id": "without_volumes-VM", 

2050 "name": "without_volumes-VM", 

2051 "sw-image-desc": "ubuntu20.04", 

2052 "alternative-sw-image-desc": [ 

2053 "ubuntu20.04-aws", 

2054 "ubuntu20.04-azure", 

2055 ], 

2056 "virtual-storage-desc": ["root-volume", "ephemeral-volume"], 

2057 } 

2058 ], 

2059 "version": "1.0", 

2060 "virtual-storage-desc": [ 

2061 {"id": "root-volume", "size-of-storage": "10"}, 

2062 { 

2063 "id": "ephemeral-volume", 

2064 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2065 "size-of-storage": "1", 

2066 }, 

2067 ], 

2068 "_admin": { 

2069 "storage": { 

2070 "fs": "mongo", 

2071 "path": "/app/storage/", 

2072 }, 

2073 "type": "vnfd", 

2074 }, 

2075 } 

2076 expected_result = { 

2077 "find_params": { 

2078 "flavor_data": { 

2079 "disk": 10, 

2080 "ram": 1024, 

2081 "vcpus": 2, 

2082 "ephemeral": 10, 

2083 }, 

2084 }, 

2085 "params": { 

2086 "flavor_data": { 

2087 "disk": 10, 

2088 "name": "test", 

2089 "ram": 1024, 

2090 "vcpus": 2, 

2091 "ephemeral": 10, 

2092 }, 

2093 }, 

2094 } 

2095 target_flavor = { 

2096 "id": "test_id", 

2097 "name": "test", 

2098 "storage-gb": "10", 

2099 "memory-mb": "1024", 

2100 "vcpu-count": "2", 

2101 } 

2102 indata = { 

2103 "vnf": [ 

2104 { 

2105 "vdur": [ 

2106 { 

2107 "ns-flavor-id": "test_id", 

2108 "virtual-storages": [ 

2109 { 

2110 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2111 "size-of-storage": "10", 

2112 }, 

2113 ], 

2114 }, 

2115 ], 

2116 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2117 }, 

2118 ], 

2119 } 

2120 vim_info = {} 

2121 target_record_id = "" 

2122 

2123 epa_params.return_value = {} 

2124 

2125 result = Ns._process_flavor_params( 

2126 target_flavor=target_flavor, 

2127 indata=indata, 

2128 vim_info=vim_info, 

2129 target_record_id=target_record_id, 

2130 **kwargs, 

2131 ) 

2132 

2133 self.assertTrue(epa_params.called) 

2134 self.assertDictEqual(result, expected_result) 

2135 

2136 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2137 def test__process_flavor_params_with_swap_disk( 

2138 self, 

2139 epa_params, 

2140 ): 

2141 expected_result = { 

2142 "find_params": { 

2143 "flavor_data": { 

2144 "disk": 10, 

2145 "ram": 1024, 

2146 "vcpus": 2, 

2147 "swap": 20, 

2148 }, 

2149 }, 

2150 "params": { 

2151 "flavor_data": { 

2152 "disk": 10, 

2153 "name": "test", 

2154 "ram": 1024, 

2155 "vcpus": 2, 

2156 "swap": 20, 

2157 }, 

2158 }, 

2159 } 

2160 target_flavor = { 

2161 "id": "test_id", 

2162 "name": "test", 

2163 "storage-gb": "10", 

2164 "memory-mb": "1024", 

2165 "vcpu-count": "2", 

2166 } 

2167 indata = { 

2168 "vnf": [ 

2169 { 

2170 "vdur": [ 

2171 { 

2172 "ns-flavor-id": "test_id", 

2173 "virtual-storages": [ 

2174 { 

2175 "type-of-storage": "etsi-nfv-descriptors:swap-storage", 

2176 "size-of-storage": "20", 

2177 }, 

2178 ], 

2179 }, 

2180 ], 

2181 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2182 }, 

2183 ], 

2184 } 

2185 vim_info = {} 

2186 target_record_id = "" 

2187 

2188 epa_params.return_value = {} 

2189 

2190 result = Ns._process_flavor_params( 

2191 target_flavor=target_flavor, 

2192 indata=indata, 

2193 vim_info=vim_info, 

2194 target_record_id=target_record_id, 

2195 ) 

2196 

2197 self.assertTrue(epa_params.called) 

2198 self.assertDictEqual(result, expected_result) 

2199 

2200 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2201 def test__process_flavor_params_with_persistent_root_disk( 

2202 self, 

2203 epa_params, 

2204 ): 

2205 kwargs = { 

2206 "db": db, 

2207 } 

2208 

2209 db.get_one.return_value = { 

2210 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2211 "df": [ 

2212 { 

2213 "id": "default-df", 

2214 "vdu-profile": [ 

2215 {"id": "several_volumes-VM", "min-number-of-instances": 1} 

2216 ], 

2217 } 

2218 ], 

2219 "id": "several_volumes-vnf", 

2220 "product-name": "several_volumes-vnf", 

2221 "vdu": [ 

2222 { 

2223 "id": "several_volumes-VM", 

2224 "name": "several_volumes-VM", 

2225 "sw-image-desc": "ubuntu20.04", 

2226 "alternative-sw-image-desc": [ 

2227 "ubuntu20.04-aws", 

2228 "ubuntu20.04-azure", 

2229 ], 

2230 "virtual-storage-desc": [ 

2231 "persistent-root-volume", 

2232 ], 

2233 } 

2234 ], 

2235 "version": "1.0", 

2236 "virtual-storage-desc": [ 

2237 { 

2238 "id": "persistent-root-volume", 

2239 "type-of-storage": "persistent-storage:persistent-storage", 

2240 "size-of-storage": "10", 

2241 }, 

2242 ], 

2243 "_admin": { 

2244 "storage": { 

2245 "fs": "mongo", 

2246 "path": "/app/storage/", 

2247 }, 

2248 "type": "vnfd", 

2249 }, 

2250 } 

2251 expected_result = { 

2252 "find_params": { 

2253 "flavor_data": { 

2254 "disk": 0, 

2255 "ram": 1024, 

2256 "vcpus": 2, 

2257 }, 

2258 }, 

2259 "params": { 

2260 "flavor_data": { 

2261 "disk": 0, 

2262 "name": "test", 

2263 "ram": 1024, 

2264 "vcpus": 2, 

2265 }, 

2266 }, 

2267 } 

2268 target_flavor = { 

2269 "id": "test_id", 

2270 "name": "test", 

2271 "storage-gb": "10", 

2272 "memory-mb": "1024", 

2273 "vcpu-count": "2", 

2274 } 

2275 indata = { 

2276 "vnf": [ 

2277 { 

2278 "vdur": [ 

2279 { 

2280 "vdu-name": "several_volumes-VM", 

2281 "ns-flavor-id": "test_id", 

2282 "virtual-storages": [ 

2283 { 

2284 "type-of-storage": "persistent-storage:persistent-storage", 

2285 "size-of-storage": "10", 

2286 }, 

2287 ], 

2288 }, 

2289 ], 

2290 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2291 }, 

2292 ], 

2293 } 

2294 vim_info = {} 

2295 target_record_id = "" 

2296 

2297 epa_params.return_value = {} 

2298 

2299 result = Ns._process_flavor_params( 

2300 target_flavor=target_flavor, 

2301 indata=indata, 

2302 vim_info=vim_info, 

2303 target_record_id=target_record_id, 

2304 **kwargs, 

2305 ) 

2306 

2307 self.assertTrue(epa_params.called) 

2308 self.assertDictEqual(result, expected_result) 

2309 

2310 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2311 def test__process_flavor_params_with_epa_params( 

2312 self, 

2313 epa_params, 

2314 ): 

2315 expected_result = { 

2316 "find_params": { 

2317 "flavor_data": { 

2318 "disk": 10, 

2319 "ram": 1024, 

2320 "vcpus": 2, 

2321 "extended": { 

2322 "numa": "there-is-numa-here", 

2323 }, 

2324 }, 

2325 }, 

2326 "params": { 

2327 "flavor_data": { 

2328 "disk": 10, 

2329 "name": "test", 

2330 "ram": 1024, 

2331 "vcpus": 2, 

2332 "extended": { 

2333 "numa": "there-is-numa-here", 

2334 }, 

2335 }, 

2336 }, 

2337 } 

2338 target_flavor = { 

2339 "id": "test_id", 

2340 "name": "test", 

2341 "storage-gb": "10", 

2342 "memory-mb": "1024", 

2343 "vcpu-count": "2", 

2344 } 

2345 indata = { 

2346 "vnf": [ 

2347 { 

2348 "vdur": [ 

2349 { 

2350 "ns-flavor-id": "test_id", 

2351 }, 

2352 ], 

2353 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2354 }, 

2355 ], 

2356 } 

2357 vim_info = {} 

2358 target_record_id = "" 

2359 

2360 epa_params.return_value = { 

2361 "numa": "there-is-numa-here", 

2362 } 

2363 

2364 result = Ns._process_flavor_params( 

2365 target_flavor=target_flavor, 

2366 indata=indata, 

2367 vim_info=vim_info, 

2368 target_record_id=target_record_id, 

2369 ) 

2370 

2371 self.assertTrue(epa_params.called) 

2372 self.assertDictEqual(result, expected_result) 

2373 

2374 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2375 def test__process_flavor_params_with_vim_flavor_id( 

2376 self, 

2377 epa_params, 

2378 ): 

2379 expected_result = { 

2380 "find_params": { 

2381 "vim_flavor_id": "test.flavor", 

2382 }, 

2383 } 

2384 target_flavor = { 

2385 "id": "test_id", 

2386 "name": "test", 

2387 "storage-gb": "10", 

2388 "memory-mb": "1024", 

2389 "vcpu-count": "2", 

2390 } 

2391 indata = { 

2392 "vnf": [ 

2393 { 

2394 "vdur": [ 

2395 { 

2396 "ns-flavor-id": "test_id", 

2397 "additionalParams": { 

2398 "OSM": {"vim_flavor_id": "test.flavor"} 

2399 }, 

2400 }, 

2401 ], 

2402 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2403 }, 

2404 ], 

2405 } 

2406 vim_info = {} 

2407 target_record_id = "" 

2408 

2409 epa_params.return_value = {} 

2410 

2411 result = Ns._process_flavor_params( 

2412 target_flavor=target_flavor, 

2413 indata=indata, 

2414 vim_info=vim_info, 

2415 target_record_id=target_record_id, 

2416 ) 

2417 

2418 self.assertFalse(epa_params.called) 

2419 self.assertDictEqual(result, expected_result) 

2420 

2421 @patch("osm_ng_ro.ns.Ns._process_epa_params") 

2422 def test__process_flavor_params( 

2423 self, 

2424 epa_params, 

2425 ): 

2426 kwargs = { 

2427 "db": db, 

2428 } 

2429 

2430 db.get_one.return_value = { 

2431 "_id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2432 "df": [ 

2433 { 

2434 "id": "default-df", 

2435 "vdu-profile": [ 

2436 {"id": "without_volumes-VM", "min-number-of-instances": 1} 

2437 ], 

2438 } 

2439 ], 

2440 "id": "without_volumes-vnf", 

2441 "product-name": "without_volumes-vnf", 

2442 "vdu": [ 

2443 { 

2444 "id": "without_volumes-VM", 

2445 "name": "without_volumes-VM", 

2446 "sw-image-desc": "ubuntu20.04", 

2447 "alternative-sw-image-desc": [ 

2448 "ubuntu20.04-aws", 

2449 "ubuntu20.04-azure", 

2450 ], 

2451 "virtual-storage-desc": ["root-volume", "ephemeral-volume"], 

2452 } 

2453 ], 

2454 "version": "1.0", 

2455 "virtual-storage-desc": [ 

2456 {"id": "root-volume", "size-of-storage": "10"}, 

2457 { 

2458 "id": "ephemeral-volume", 

2459 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2460 "size-of-storage": "1", 

2461 }, 

2462 ], 

2463 "_admin": { 

2464 "storage": { 

2465 "fs": "mongo", 

2466 "path": "/app/storage/", 

2467 }, 

2468 "type": "vnfd", 

2469 }, 

2470 } 

2471 

2472 expected_result = { 

2473 "find_params": { 

2474 "flavor_data": { 

2475 "disk": 10, 

2476 "ram": 1024, 

2477 "vcpus": 2, 

2478 "ephemeral": 10, 

2479 "swap": 20, 

2480 "extended": { 

2481 "numa": "there-is-numa-here", 

2482 }, 

2483 }, 

2484 }, 

2485 "params": { 

2486 "flavor_data": { 

2487 "disk": 10, 

2488 "name": "test", 

2489 "ram": 1024, 

2490 "vcpus": 2, 

2491 "ephemeral": 10, 

2492 "swap": 20, 

2493 "extended": { 

2494 "numa": "there-is-numa-here", 

2495 }, 

2496 }, 

2497 }, 

2498 } 

2499 target_flavor = { 

2500 "id": "test_id", 

2501 "name": "test", 

2502 "storage-gb": "10", 

2503 "memory-mb": "1024", 

2504 "vcpu-count": "2", 

2505 } 

2506 indata = { 

2507 "vnf": [ 

2508 { 

2509 "vdur": [ 

2510 { 

2511 "ns-flavor-id": "test_id", 

2512 "virtual-storages": [ 

2513 { 

2514 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

2515 "size-of-storage": "10", 

2516 }, 

2517 { 

2518 "type-of-storage": "etsi-nfv-descriptors:swap-storage", 

2519 "size-of-storage": "20", 

2520 }, 

2521 ], 

2522 }, 

2523 ], 

2524 "vnfd-id": "ad6356e3-698c-43bf-9901-3aae9e9b9d18", 

2525 }, 

2526 ], 

2527 } 

2528 vim_info = {} 

2529 target_record_id = "" 

2530 

2531 epa_params.return_value = { 

2532 "numa": "there-is-numa-here", 

2533 } 

2534 

2535 result = Ns._process_flavor_params( 

2536 target_flavor=target_flavor, 

2537 indata=indata, 

2538 vim_info=vim_info, 

2539 target_record_id=target_record_id, 

2540 **kwargs, 

2541 ) 

2542 

2543 self.assertTrue(epa_params.called) 

2544 self.assertDictEqual(result, expected_result) 

2545 

2546 def test__process_net_params_with_empty_params( 

2547 self, 

2548 ): 

2549 target_vld = { 

2550 "name": "vld-name", 

2551 } 

2552 indata = { 

2553 "name": "ns-name", 

2554 } 

2555 vim_info = { 

2556 "provider_network": "some-profile-here", 

2557 "ip_profile": { 

2558 "some_ip_profile": "here", 

2559 }, 

2560 } 

2561 target_record_id = "" 

2562 expected_result = { 

2563 "params": { 

2564 "net_name": "ns-name-vld-name", 

2565 "net_type": "bridge", 

2566 "ip_profile": { 

2567 "some_ip_profile": "here", 

2568 }, 

2569 "provider_network_profile": "some-profile-here", 

2570 } 

2571 } 

2572 

2573 result = Ns._process_net_params( 

2574 target_vld=target_vld, 

2575 indata=indata, 

2576 vim_info=vim_info, 

2577 target_record_id=target_record_id, 

2578 ) 

2579 

2580 self.assertDictEqual(expected_result, result) 

2581 

2582 def test__process_net_params_with_vim_info_sdn( 

2583 self, 

2584 ): 

2585 target_vld = { 

2586 "name": "vld-name", 

2587 } 

2588 indata = { 

2589 "name": "ns-name", 

2590 } 

2591 vim_info = { 

2592 "sdn": "some-sdn", 

2593 "sdn-ports": ["some", "ports", "here"], 

2594 "vlds": ["some", "vlds", "here"], 

2595 "type": "sdn-type", 

2596 } 

2597 target_record_id = "vld.sdn.something" 

2598 expected_result = { 

2599 "params": { 

2600 "sdn-ports": ["some", "ports", "here"], 

2601 "vlds": ["some", "vlds", "here"], 

2602 "type": "sdn-type", 

2603 } 

2604 } 

2605 

2606 result = Ns._process_net_params( 

2607 target_vld=target_vld, 

2608 indata=indata, 

2609 vim_info=vim_info, 

2610 target_record_id=target_record_id, 

2611 ) 

2612 

2613 self.assertDictEqual(expected_result, result) 

2614 

2615 def test__process_net_params_with_vim_info_sdn_target_vim( 

2616 self, 

2617 ): 

2618 target_vld = { 

2619 "name": "vld-name", 

2620 } 

2621 indata = { 

2622 "name": "ns-name", 

2623 } 

2624 vim_info = { 

2625 "sdn": "some-sdn", 

2626 "sdn-ports": ["some", "ports", "here"], 

2627 "vlds": ["some", "vlds", "here"], 

2628 "target_vim": "some-vim", 

2629 "type": "sdn-type", 

2630 } 

2631 target_record_id = "vld.sdn.something" 

2632 expected_result = { 

2633 "depends_on": ["some-vim vld.sdn"], 

2634 "params": { 

2635 "sdn-ports": ["some", "ports", "here"], 

2636 "vlds": ["some", "vlds", "here"], 

2637 "target_vim": "some-vim", 

2638 "type": "sdn-type", 

2639 }, 

2640 } 

2641 

2642 result = Ns._process_net_params( 

2643 target_vld=target_vld, 

2644 indata=indata, 

2645 vim_info=vim_info, 

2646 target_record_id=target_record_id, 

2647 ) 

2648 

2649 self.assertDictEqual(expected_result, result) 

2650 

2651 def test__process_net_params_with_vim_network_name( 

2652 self, 

2653 ): 

2654 target_vld = { 

2655 "name": "vld-name", 

2656 } 

2657 indata = { 

2658 "name": "ns-name", 

2659 } 

2660 vim_info = { 

2661 "vim_network_name": "some-network-name", 

2662 } 

2663 target_record_id = "vld.sdn.something" 

2664 expected_result = { 

2665 "find_params": { 

2666 "filter_dict": { 

2667 "name": "some-network-name", 

2668 }, 

2669 }, 

2670 } 

2671 

2672 result = Ns._process_net_params( 

2673 target_vld=target_vld, 

2674 indata=indata, 

2675 vim_info=vim_info, 

2676 target_record_id=target_record_id, 

2677 ) 

2678 

2679 self.assertDictEqual(expected_result, result) 

2680 

2681 def test__process_net_params_with_vim_network_id( 

2682 self, 

2683 ): 

2684 target_vld = { 

2685 "name": "vld-name", 

2686 } 

2687 indata = { 

2688 "name": "ns-name", 

2689 } 

2690 vim_info = { 

2691 "vim_network_id": "some-network-id", 

2692 } 

2693 target_record_id = "vld.sdn.something" 

2694 expected_result = { 

2695 "find_params": { 

2696 "filter_dict": { 

2697 "id": "some-network-id", 

2698 }, 

2699 }, 

2700 } 

2701 

2702 result = Ns._process_net_params( 

2703 target_vld=target_vld, 

2704 indata=indata, 

2705 vim_info=vim_info, 

2706 target_record_id=target_record_id, 

2707 ) 

2708 

2709 self.assertDictEqual(expected_result, result) 

2710 

2711 def test__process_net_params_with_mgmt_network( 

2712 self, 

2713 ): 

2714 target_vld = { 

2715 "id": "vld-id", 

2716 "name": "vld-name", 

2717 "mgmt-network": "some-mgmt-network", 

2718 } 

2719 indata = { 

2720 "name": "ns-name", 

2721 } 

2722 vim_info = {} 

2723 target_record_id = "vld.sdn.something" 

2724 expected_result = { 

2725 "find_params": { 

2726 "mgmt": True, 

2727 "name": "vld-id", 

2728 }, 

2729 } 

2730 

2731 result = Ns._process_net_params( 

2732 target_vld=target_vld, 

2733 indata=indata, 

2734 vim_info=vim_info, 

2735 target_record_id=target_record_id, 

2736 ) 

2737 

2738 self.assertDictEqual(expected_result, result) 

2739 

2740 def test__process_net_params_with_underlay_eline( 

2741 self, 

2742 ): 

2743 target_vld = { 

2744 "name": "vld-name", 

2745 "underlay": "some-underlay-here", 

2746 "type": "ELINE", 

2747 } 

2748 indata = { 

2749 "name": "ns-name", 

2750 } 

2751 vim_info = { 

2752 "provider_network": "some-profile-here", 

2753 "ip_profile": { 

2754 "some_ip_profile": "here", 

2755 }, 

2756 } 

2757 target_record_id = "" 

2758 expected_result = { 

2759 "params": { 

2760 "ip_profile": { 

2761 "some_ip_profile": "here", 

2762 }, 

2763 "net_name": "ns-name-vld-name", 

2764 "net_type": "ptp", 

2765 "provider_network_profile": "some-profile-here", 

2766 } 

2767 } 

2768 

2769 result = Ns._process_net_params( 

2770 target_vld=target_vld, 

2771 indata=indata, 

2772 vim_info=vim_info, 

2773 target_record_id=target_record_id, 

2774 ) 

2775 

2776 self.assertDictEqual(expected_result, result) 

2777 

2778 def test__process_net_params_with_underlay_elan( 

2779 self, 

2780 ): 

2781 target_vld = { 

2782 "name": "vld-name", 

2783 "underlay": "some-underlay-here", 

2784 "type": "ELAN", 

2785 } 

2786 indata = { 

2787 "name": "ns-name", 

2788 } 

2789 vim_info = { 

2790 "provider_network": "some-profile-here", 

2791 "ip_profile": { 

2792 "some_ip_profile": "here", 

2793 }, 

2794 } 

2795 target_record_id = "" 

2796 expected_result = { 

2797 "params": { 

2798 "ip_profile": { 

2799 "some_ip_profile": "here", 

2800 }, 

2801 "net_name": "ns-name-vld-name", 

2802 "net_type": "data", 

2803 "provider_network_profile": "some-profile-here", 

2804 } 

2805 } 

2806 

2807 result = Ns._process_net_params( 

2808 target_vld=target_vld, 

2809 indata=indata, 

2810 vim_info=vim_info, 

2811 target_record_id=target_record_id, 

2812 ) 

2813 

2814 self.assertDictEqual(expected_result, result) 

2815 

2816 def test__get_cloud_init_exception(self): 

2817 db_mock = MagicMock(name="database mock") 

2818 fs_mock = None 

2819 

2820 location = "" 

2821 

2822 with self.assertRaises(NsException): 

2823 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2824 

2825 def test__get_cloud_init_file_fs_exception(self): 

2826 db_mock = MagicMock(name="database mock") 

2827 fs_mock = None 

2828 

2829 location = "vnfr_id_123456:file:test_file" 

2830 db_mock.get_one.return_value = { 

2831 "_admin": { 

2832 "storage": { 

2833 "folder": "/home/osm", 

2834 "pkg-dir": "vnfr_test_dir", 

2835 }, 

2836 }, 

2837 } 

2838 

2839 with self.assertRaises(NsException): 

2840 Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2841 

2842 def test__get_cloud_init_file(self): 

2843 db_mock = MagicMock(name="database mock") 

2844 fs_mock = MagicMock(name="filesystem mock") 

2845 file_mock = MagicMock(name="file mock") 

2846 

2847 location = "vnfr_id_123456:file:test_file" 

2848 cloud_init_content = "this is a cloud init file content" 

2849 

2850 db_mock.get_one.return_value = { 

2851 "_admin": { 

2852 "storage": { 

2853 "folder": "/home/osm", 

2854 "pkg-dir": "vnfr_test_dir", 

2855 }, 

2856 }, 

2857 } 

2858 fs_mock.file_open.return_value = file_mock 

2859 file_mock.__enter__.return_value.read.return_value = cloud_init_content 

2860 

2861 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2862 

2863 self.assertEqual(cloud_init_content, result) 

2864 

2865 def test__get_cloud_init_vdu(self): 

2866 db_mock = MagicMock(name="database mock") 

2867 fs_mock = None 

2868 

2869 location = "vnfr_id_123456:vdu:0" 

2870 cloud_init_content = "this is a cloud init file content" 

2871 

2872 db_mock.get_one.return_value = { 

2873 "vdu": { 

2874 0: { 

2875 "cloud-init": cloud_init_content, 

2876 }, 

2877 }, 

2878 } 

2879 

2880 result = Ns._get_cloud_init(db=db_mock, fs=fs_mock, location=location) 

2881 

2882 self.assertEqual(cloud_init_content, result) 

2883 

2884 @patch("jinja2.Environment.__init__") 

2885 def test__parse_jinja2_undefined_error(self, env_mock: Mock): 

2886 cloud_init_content = None 

2887 params = None 

2888 context = None 

2889 

2890 env_mock.side_effect = UndefinedError("UndefinedError occurred.") 

2891 

2892 with self.assertRaises(NsException): 

2893 Ns._parse_jinja2( 

2894 cloud_init_content=cloud_init_content, params=params, context=context 

2895 ) 

2896 

2897 @patch("jinja2.Environment.__init__") 

2898 def test__parse_jinja2_template_error(self, env_mock: Mock): 

2899 cloud_init_content = None 

2900 params = None 

2901 context = None 

2902 

2903 env_mock.side_effect = TemplateError("TemplateError occurred.") 

2904 

2905 with self.assertRaises(NsException): 

2906 Ns._parse_jinja2( 

2907 cloud_init_content=cloud_init_content, params=params, context=context 

2908 ) 

2909 

2910 @patch("jinja2.Environment.__init__") 

2911 def test__parse_jinja2_template_not_found(self, env_mock: Mock): 

2912 cloud_init_content = None 

2913 params = None 

2914 context = None 

2915 

2916 env_mock.side_effect = TemplateNotFound("TemplateNotFound occurred.") 

2917 

2918 with self.assertRaises(NsException): 

2919 Ns._parse_jinja2( 

2920 cloud_init_content=cloud_init_content, params=params, context=context 

2921 ) 

2922 

2923 def test_rendering_jinja2_temp_without_special_characters(self): 

2924 cloud_init_content = """ 

2925 disk_setup: 

2926 ephemeral0: 

2927 table_type: {{type}} 

2928 layout: True 

2929 overwrite: {{is_override}} 

2930 runcmd: 

2931 - [ ls, -l, / ] 

2932 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

2933 """ 

2934 params = { 

2935 "type": "mbr", 

2936 "is_override": "False", 

2937 "command": "; mkdir abc", 

2938 } 

2939 context = "cloud-init for VM" 

2940 expected_result = """ 

2941 disk_setup: 

2942 ephemeral0: 

2943 table_type: mbr 

2944 layout: True 

2945 overwrite: False 

2946 runcmd: 

2947 - [ ls, -l, / ] 

2948 - [ sh, -xc, "echo $(date) '; mkdir abc'" ] 

2949 """ 

2950 result = Ns._parse_jinja2( 

2951 cloud_init_content=cloud_init_content, params=params, context=context 

2952 ) 

2953 self.assertEqual(result, expected_result) 

2954 

2955 def test_rendering_jinja2_temp_with_special_characters(self): 

2956 cloud_init_content = """ 

2957 disk_setup: 

2958 ephemeral0: 

2959 table_type: {{type}} 

2960 layout: True 

2961 overwrite: {{is_override}} 

2962 runcmd: 

2963 - [ ls, -l, / ] 

2964 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

2965 """ 

2966 params = { 

2967 "type": "mbr", 

2968 "is_override": "False", 

2969 "command": "& rm -rf", 

2970 } 

2971 context = "cloud-init for VM" 

2972 expected_result = """ 

2973 disk_setup: 

2974 ephemeral0: 

2975 table_type: mbr 

2976 layout: True 

2977 overwrite: False 

2978 runcmd: 

2979 - [ ls, -l, / ] 

2980 - [ sh, -xc, "echo $(date) '& rm -rf /'" ] 

2981 """ 

2982 result = Ns._parse_jinja2( 

2983 cloud_init_content=cloud_init_content, params=params, context=context 

2984 ) 

2985 self.assertNotEqual(result, expected_result) 

2986 

2987 def test_rendering_jinja2_temp_with_special_characters_autoescape_is_false(self): 

2988 with patch("osm_ng_ro.ns.Environment") as mock_environment: 

2989 mock_environment.return_value = Environment( 

2990 undefined=StrictUndefined, 

2991 autoescape=select_autoescape(default_for_string=False, default=False), 

2992 ) 

2993 cloud_init_content = """ 

2994 disk_setup: 

2995 ephemeral0: 

2996 table_type: {{type}} 

2997 layout: True 

2998 overwrite: {{is_override}} 

2999 runcmd: 

3000 - [ ls, -l, / ] 

3001 - [ sh, -xc, "echo $(date) '{{command}}'" ] 

3002 """ 

3003 params = { 

3004 "type": "mbr", 

3005 "is_override": "False", 

3006 "command": "& rm -rf /", 

3007 } 

3008 context = "cloud-init for VM" 

3009 expected_result = """ 

3010 disk_setup: 

3011 ephemeral0: 

3012 table_type: mbr 

3013 layout: True 

3014 overwrite: False 

3015 runcmd: 

3016 - [ ls, -l, / ] 

3017 - [ sh, -xc, "echo $(date) '& rm -rf /'" ] 

3018 """ 

3019 result = Ns._parse_jinja2( 

3020 cloud_init_content=cloud_init_content, 

3021 params=params, 

3022 context=context, 

3023 ) 

3024 self.assertEqual(result, expected_result) 

3025 

3026 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3027 def test__rebuild_start_stop_task__successful(self, assign_vim): 

3028 self.ns = Ns() 

3029 extra_dict = {} 

3030 actions = ["start", "stop", "rebuild"] 

3031 vdu_index = "0" 

3032 task_index = 0 

3033 for action in actions: 

3034 params = { 

3035 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3036 "action": action, 

3037 } 

3038 extra_dict["params"] = params 

3039 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3040 expected_result["target_record"] = ( 

3041 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3042 ) 

3043 expected_result["params"] = params 

3044 task = self.ns.rebuild_start_stop_task( 

3045 vdu_id, 

3046 vnf_id, 

3047 vdu_index, 

3048 action_id, 

3049 nsr_id_2, 

3050 task_index, 

3051 target_vim, 

3052 extra_dict, 

3053 ) 

3054 self.assertDictEqual(task, expected_result) 

3055 

3056 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3057 def test__rebuild_start_stop_task__empty_extra_dict__task_without_params( 

3058 self, assign_vim 

3059 ): 

3060 self.ns = Ns() 

3061 extra_dict = {} 

3062 actions = ["start", "stop", "rebuild"] 

3063 vdu_index = "0" 

3064 task_index = 0 

3065 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3066 expected_result["target_record"] = ( 

3067 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3068 ) 

3069 for _ in actions: 

3070 task = self.ns.rebuild_start_stop_task( 

3071 vdu_id, 

3072 vnf_id, 

3073 vdu_index, 

3074 action_id, 

3075 nsr_id_2, 

3076 task_index, 

3077 target_vim, 

3078 extra_dict, 

3079 ) 

3080 self.assertDictEqual(task, expected_result) 

3081 

3082 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3083 def test__rebuild_start_stop_task__different_vdu_index__target_record_changes( 

3084 self, assign_vim 

3085 ): 

3086 self.ns = Ns() 

3087 extra_dict = {} 

3088 actions = ["start", "stop", "rebuild"] 

3089 vdu_index = "4" 

3090 task_index = 0 

3091 for action in actions: 

3092 params = { 

3093 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3094 "action": action, 

3095 } 

3096 extra_dict["params"] = params 

3097 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3098 expected_result["target_record"] = ( 

3099 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.4.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3100 ) 

3101 expected_result["params"] = params 

3102 task = self.ns.rebuild_start_stop_task( 

3103 vdu_id, 

3104 vnf_id, 

3105 vdu_index, 

3106 action_id, 

3107 nsr_id_2, 

3108 task_index, 

3109 target_vim, 

3110 extra_dict, 

3111 ) 

3112 self.assertDictEqual(task, expected_result) 

3113 

3114 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3115 def test__rebuild_start_stop_task__different_task_index__task_id_changes( 

3116 self, assign_vim 

3117 ): 

3118 self.ns = Ns() 

3119 extra_dict = {} 

3120 actions = ["start", "stop", "rebuild"] 

3121 vdu_index = "0" 

3122 task_index = 3 

3123 for action in actions: 

3124 params = { 

3125 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3126 "action": action, 

3127 } 

3128 extra_dict["params"] = params 

3129 expected_result = deepcopy(expected_result_rebuild_start_stop) 

3130 expected_result["target_record"] = ( 

3131 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.0.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3132 ) 

3133 expected_result["params"] = params 

3134 expected_result["task_id"] = "bb937f49-3870-4169-b758-9732e1ff40f3:3" 

3135 task = self.ns.rebuild_start_stop_task( 

3136 vdu_id, 

3137 vnf_id, 

3138 vdu_index, 

3139 action_id, 

3140 nsr_id_2, 

3141 task_index, 

3142 target_vim, 

3143 extra_dict, 

3144 ) 

3145 self.assertDictEqual(task, expected_result) 

3146 

3147 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3148 def test__rebuild_start_stop_task__assign_vim_raises__task_is_not_created( 

3149 self, assign_vim 

3150 ): 

3151 self.ns = Ns() 

3152 extra_dict = {} 

3153 actions = ["start", "stop", "rebuild"] 

3154 vdu_index = "0" 

3155 task_index = 0 

3156 for action in actions: 

3157 params = { 

3158 "vim_vm_id": "f37b18ef-3caa-4dc9-ab91-15c669b16396", 

3159 "action": action, 

3160 } 

3161 extra_dict["params"] = params 

3162 assign_vim.side_effect = TestException("Can not connect to VIM.") 

3163 with self.assertRaises(TestException) as err: 

3164 task = self.ns.rebuild_start_stop_task( 

3165 vdu_id, 

3166 vnf_id, 

3167 vdu_index, 

3168 action_id, 

3169 nsr_id_2, 

3170 task_index, 

3171 target_vim, 

3172 extra_dict, 

3173 ) 

3174 self.assertEqual(task, None) 

3175 self.assertEqual(str(err.exception), "Can not connect to VIM.") 

3176 

3177 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3178 def test_verticalscale_task__successful(self, assign_vim): 

3179 self.ns = Ns() 

3180 vdu_index = "1" 

3181 task_index = 1 

3182 task = self.ns.verticalscale_task( 

3183 vdu, 

3184 vnf, 

3185 vdu_index, 

3186 action_id, 

3187 nsr_id_2, 

3188 task_index, 

3189 extra_dict_vertical_scale, 

3190 ) 

3191 self.assertDictEqual(task, expected_result_vertical_scale) 

3192 

3193 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3194 def test_verticalscale_task__task_index_changes__task_id_changes(self, assign_vim): 

3195 self.ns = Ns() 

3196 vdu_index = "1" 

3197 task_index = 2 

3198 expected_result = deepcopy(expected_result_vertical_scale) 

3199 expected_result["task_id"] = "bb937f49-3870-4169-b758-9732e1ff40f3:2" 

3200 task = self.ns.verticalscale_task( 

3201 vdu, 

3202 vnf, 

3203 vdu_index, 

3204 action_id, 

3205 nsr_id_2, 

3206 task_index, 

3207 extra_dict_vertical_scale, 

3208 ) 

3209 self.assertDictEqual(task, expected_result) 

3210 

3211 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3212 def test_verticalscale_task__empty_extra_dict__expected_result_without_params( 

3213 self, assign_vim 

3214 ): 

3215 self.ns = Ns() 

3216 extra_dict = {"params": {}} 

3217 vdu_index = "1" 

3218 task_index = 1 

3219 expected_result = { 

3220 "target_id": "vim:f9f370ac-0d44-41a7-9000-457f2332bc35", 

3221 "action_id": "bb937f49-3870-4169-b758-9732e1ff40f3", 

3222 "nsr_id": "993166fe-723e-4680-ac4b-b1af2541ae31", 

3223 "task_id": "bb937f49-3870-4169-b758-9732e1ff40f3:1", 

3224 "status": "SCHEDULED", 

3225 "action": "EXEC", 

3226 "item": "verticalscale", 

3227 "target_record": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.1.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35", 

3228 "target_record_id": "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.bb9c43f9-10a2-4569-a8a8-957c3528b6d1", 

3229 "params": { 

3230 "flavor_id": "TASK-nsrs:993166fe-723e-4680-ac4b-b1af2541ae31:flavor.0" 

3231 }, 

3232 "depends_on": ["nsrs:993166fe-723e-4680-ac4b-b1af2541ae31:flavor.0"], 

3233 } 

3234 

3235 task = self.ns.verticalscale_task( 

3236 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict 

3237 ) 

3238 self.assertDictEqual(task, expected_result) 

3239 

3240 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3241 def test_verticalscale_task__assign_vim_raises__task_is_not_created( 

3242 self, assign_vim 

3243 ): 

3244 self.ns = Ns() 

3245 vdu_index = "1" 

3246 task_index = 1 

3247 assign_vim.side_effect = TestException("Can not connect to VIM.") 

3248 with self.assertRaises(TestException) as err: 

3249 task = self.ns.verticalscale_task( 

3250 vdu, 

3251 vnf, 

3252 vdu_index, 

3253 action_id, 

3254 nsr_id_2, 

3255 task_index, 

3256 extra_dict_vertical_scale, 

3257 ) 

3258 self.assertEqual(task, {}) 

3259 self.assertEqual(str(err.exception), "Can not connect to VIM.") 

3260 

3261 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3262 def test_migrate_task__successful(self, assign_vim): 

3263 self.ns = Ns() 

3264 vdu_index = "1" 

3265 task_index = 1 

3266 task = self.ns.migrate_task( 

3267 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict_migrate 

3268 ) 

3269 self.assertDictEqual(task, expected_result_migrate) 

3270 

3271 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3272 def test_migrate_task__empty_extra_dict__task_without_params(self, assign_vim): 

3273 self.ns = Ns() 

3274 extra_dict = {} 

3275 vdu_index = "1" 

3276 task_index = 1 

3277 expected_result = deepcopy(expected_result_migrate) 

3278 expected_result.pop("params") 

3279 task = self.ns.migrate_task( 

3280 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict 

3281 ) 

3282 self.assertDictEqual(task, expected_result) 

3283 

3284 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3285 def test_migrate_task__different_vdu_index__target_record_with_different_vdu_index( 

3286 self, assign_vim 

3287 ): 

3288 self.ns = Ns() 

3289 vdu_index = "4" 

3290 task_index = 1 

3291 expected_result = deepcopy(expected_result_migrate) 

3292 expected_result["target_record"] = ( 

3293 "vnfrs:665b4165-ce24-4320-bf19-b9a45bade49f:vdur.4.vim_info.vim:f9f370ac-0d44-41a7-9000-457f2332bc35" 

3294 ) 

3295 task = self.ns.migrate_task( 

3296 vdu, vnf, vdu_index, action_id, nsr_id_2, task_index, extra_dict_migrate 

3297 ) 

3298 self.assertDictEqual(task, expected_result) 

3299 

3300 @patch("osm_ng_ro.ns.Ns._assign_vim") 

3301 def test_migrate_task__assign_vim_raises__task_is_not_created(self, assign_vim): 

3302 self.ns = Ns() 

3303 vdu_index = "1" 

3304 task_index = 1 

3305 assign_vim.side_effect = TestException("Can not connect to VIM.") 

3306 with self.assertRaises(TestException) as err: 

3307 task = self.ns.migrate_task( 

3308 vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict_migrate 

3309 ) 

3310 self.assertDictEqual(task, {}) 

3311 self.assertEqual(str(err.exception), "Can not connect to VIM.") 

3312 

3313 

3314class TestProcessVduParams(unittest.TestCase): 

3315 def setUp(self): 

3316 self.ns = Ns() 

3317 self.logger = CopyingMock(autospec=True) 

3318 

3319 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3320 def test_find_persistent_root_volumes_empty_instantiation_vol_list( 

3321 self, mock_volume_keeping_required 

3322 ): 

3323 """Find persistent root volume, instantiation_vol_list is empty.""" 

3324 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3325 target_vdu = target_vdu_wth_persistent_storage 

3326 vdu_instantiation_volumes_list = [] 

3327 disk_list = [] 

3328 mock_volume_keeping_required.return_value = True 

3329 expected_root_disk = { 

3330 "id": "persistent-root-volume", 

3331 "type-of-storage": "persistent-storage:persistent-storage", 

3332 "size-of-storage": "10", 

3333 "vdu-storage-requirements": [{"key": "keep-volume", "value": "true"}], 

3334 } 

3335 expected_persist_root_disk = { 

3336 "persistent-root-volume": { 

3337 "image_id": "ubuntu20.04", 

3338 "size": "10", 

3339 "keep": True, 

3340 } 

3341 } 

3342 expected_disk_list = [ 

3343 { 

3344 "image_id": "ubuntu20.04", 

3345 "size": "10", 

3346 "keep": True, 

3347 }, 

3348 ] 

3349 persist_root_disk = self.ns.find_persistent_root_volumes( 

3350 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3351 ) 

3352 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3353 mock_volume_keeping_required.assert_called_once_with(expected_root_disk) 

3354 self.assertEqual(disk_list, expected_disk_list) 

3355 self.assertEqual(len(disk_list), 1) 

3356 

3357 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3358 def test_find_persistent_root_volumes_always_selects_first_vsd_as_root( 

3359 self, mock_volume_keeping_required 

3360 ): 

3361 """Find persistent root volume, always selects the first vsd as root volume.""" 

3362 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3363 vnfd["vdu"][0]["virtual-storage-desc"] = [ 

3364 "persistent-volume2", 

3365 "persistent-root-volume", 

3366 "ephemeral-volume", 

3367 ] 

3368 target_vdu = target_vdu_wth_persistent_storage 

3369 vdu_instantiation_volumes_list = [] 

3370 disk_list = [] 

3371 mock_volume_keeping_required.return_value = True 

3372 expected_root_disk = { 

3373 "id": "persistent-volume2", 

3374 "type-of-storage": "persistent-storage:persistent-storage", 

3375 "size-of-storage": "10", 

3376 } 

3377 expected_persist_root_disk = { 

3378 "persistent-volume2": { 

3379 "image_id": "ubuntu20.04", 

3380 "size": "10", 

3381 "keep": True, 

3382 } 

3383 } 

3384 expected_disk_list = [ 

3385 { 

3386 "image_id": "ubuntu20.04", 

3387 "size": "10", 

3388 "keep": True, 

3389 }, 

3390 ] 

3391 persist_root_disk = self.ns.find_persistent_root_volumes( 

3392 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3393 ) 

3394 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3395 mock_volume_keeping_required.assert_called_once_with(expected_root_disk) 

3396 self.assertEqual(disk_list, expected_disk_list) 

3397 self.assertEqual(len(disk_list), 1) 

3398 

3399 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3400 def test_find_persistent_root_volumes_empty_size_of_storage( 

3401 self, mock_volume_keeping_required 

3402 ): 

3403 """Find persistent root volume, size of storage is empty.""" 

3404 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3405 vnfd["virtual-storage-desc"][0]["size-of-storage"] = "" 

3406 vnfd["vdu"][0]["virtual-storage-desc"] = [ 

3407 "persistent-volume2", 

3408 "persistent-root-volume", 

3409 "ephemeral-volume", 

3410 ] 

3411 target_vdu = target_vdu_wth_persistent_storage 

3412 vdu_instantiation_volumes_list = [] 

3413 disk_list = [] 

3414 persist_root_disk = self.ns.find_persistent_root_volumes( 

3415 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3416 ) 

3417 self.assertEqual(persist_root_disk, {}) 

3418 mock_volume_keeping_required.assert_not_called() 

3419 self.assertEqual(disk_list, []) 

3420 

3421 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3422 def test_find_persistent_root_volumes_keeping_is_not_required( 

3423 self, mock_volume_keeping_required 

3424 ): 

3425 """Find persistent root volume, volume keeping is not required.""" 

3426 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3427 vnfd["virtual-storage-desc"][1]["vdu-storage-requirements"] = [ 

3428 {"key": "keep-volume", "value": "false"}, 

3429 ] 

3430 target_vdu = target_vdu_wth_persistent_storage 

3431 vdu_instantiation_volumes_list = [] 

3432 disk_list = [] 

3433 mock_volume_keeping_required.return_value = False 

3434 expected_root_disk = { 

3435 "id": "persistent-root-volume", 

3436 "type-of-storage": "persistent-storage:persistent-storage", 

3437 "size-of-storage": "10", 

3438 "vdu-storage-requirements": [{"key": "keep-volume", "value": "false"}], 

3439 } 

3440 expected_persist_root_disk = { 

3441 "persistent-root-volume": { 

3442 "image_id": "ubuntu20.04", 

3443 "size": "10", 

3444 "keep": False, 

3445 } 

3446 } 

3447 expected_disk_list = [ 

3448 { 

3449 "image_id": "ubuntu20.04", 

3450 "size": "10", 

3451 "keep": False, 

3452 }, 

3453 ] 

3454 persist_root_disk = self.ns.find_persistent_root_volumes( 

3455 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3456 ) 

3457 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3458 mock_volume_keeping_required.assert_called_once_with(expected_root_disk) 

3459 self.assertEqual(disk_list, expected_disk_list) 

3460 self.assertEqual(len(disk_list), 1) 

3461 

3462 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3463 def test_find_persistent_root_volumes_target_vdu_mismatch( 

3464 self, mock_volume_keeping_required 

3465 ): 

3466 """Find persistent root volume, target vdu name is not matching.""" 

3467 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3468 vnfd["vdu"][0]["name"] = "Several_Volumes-VM" 

3469 target_vdu = target_vdu_wth_persistent_storage 

3470 vdu_instantiation_volumes_list = [] 

3471 disk_list = [] 

3472 result = self.ns.find_persistent_root_volumes( 

3473 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3474 ) 

3475 self.assertEqual(result, None) 

3476 mock_volume_keeping_required.assert_not_called() 

3477 self.assertEqual(disk_list, []) 

3478 self.assertEqual(len(disk_list), 0) 

3479 

3480 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3481 def test_find_persistent_root_volumes_with_instantiation_vol_list( 

3482 self, mock_volume_keeping_required 

3483 ): 

3484 """Find persistent root volume, existing volume needs to be used.""" 

3485 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3486 target_vdu = target_vdu_wth_persistent_storage 

3487 vdu_instantiation_volumes_list = [ 

3488 { 

3489 "vim-volume-id": vim_volume_id, 

3490 "name": "persistent-root-volume", 

3491 } 

3492 ] 

3493 disk_list = [] 

3494 expected_persist_root_disk = { 

3495 "persistent-root-volume": { 

3496 "vim_volume_id": vim_volume_id, 

3497 "image_id": "ubuntu20.04", 

3498 }, 

3499 } 

3500 expected_disk_list = [ 

3501 { 

3502 "vim_volume_id": vim_volume_id, 

3503 "image_id": "ubuntu20.04", 

3504 }, 

3505 ] 

3506 persist_root_disk = self.ns.find_persistent_root_volumes( 

3507 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3508 ) 

3509 self.assertEqual(persist_root_disk, expected_persist_root_disk) 

3510 mock_volume_keeping_required.assert_not_called() 

3511 self.assertEqual(disk_list, expected_disk_list) 

3512 self.assertEqual(len(disk_list), 1) 

3513 

3514 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3515 def test_find_persistent_root_volumes_invalid_instantiation_params( 

3516 self, mock_volume_keeping_required 

3517 ): 

3518 """Find persistent root volume, existing volume id keyword is invalid.""" 

3519 vnfd = deepcopy(vnfd_wth_persistent_storage) 

3520 target_vdu = target_vdu_wth_persistent_storage 

3521 vdu_instantiation_volumes_list = [ 

3522 { 

3523 "volume-id": vim_volume_id, 

3524 "name": "persistent-root-volume", 

3525 } 

3526 ] 

3527 disk_list = [] 

3528 with self.assertRaises(KeyError): 

3529 self.ns.find_persistent_root_volumes( 

3530 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list 

3531 ) 

3532 mock_volume_keeping_required.assert_not_called() 

3533 self.assertEqual(disk_list, []) 

3534 self.assertEqual(len(disk_list), 0) 

3535 

3536 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3537 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_inst_vol_list( 

3538 self, mock_volume_keeping_required 

3539 ): 

3540 """Find persistent ordinary volume, there is persistent root disk and instatiation volume list is empty.""" 

3541 persistent_root_disk = { 

3542 "persistent-root-volume": { 

3543 "image_id": "ubuntu20.04", 

3544 "size": "10", 

3545 "keep": False, 

3546 } 

3547 } 

3548 mock_volume_keeping_required.return_value = False 

3549 target_vdu = target_vdu_wth_persistent_storage 

3550 vdu_instantiation_volumes_list = [] 

3551 disk_list = [ 

3552 { 

3553 "image_id": "ubuntu20.04", 

3554 "size": "10", 

3555 "keep": False, 

3556 }, 

3557 ] 

3558 expected_disk = { 

3559 "id": "persistent-volume2", 

3560 "size-of-storage": "10", 

3561 "type-of-storage": "persistent-storage:persistent-storage", 

3562 } 

3563 expected_disk_list = [ 

3564 { 

3565 "image_id": "ubuntu20.04", 

3566 "size": "10", 

3567 "keep": False, 

3568 }, 

3569 { 

3570 "size": "10", 

3571 "keep": False, 

3572 }, 

3573 ] 

3574 self.ns.find_persistent_volumes( 

3575 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3576 ) 

3577 self.assertEqual(disk_list, expected_disk_list) 

3578 mock_volume_keeping_required.assert_called_once_with(expected_disk) 

3579 

3580 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3581 def test_find_persistent_volumes_vdu_wth_inst_vol_list( 

3582 self, mock_volume_keeping_required 

3583 ): 

3584 """Find persistent ordinary volume, vim-volume-id is given as instantiation parameter.""" 

3585 persistent_root_disk = { 

3586 "persistent-root-volume": { 

3587 "image_id": "ubuntu20.04", 

3588 "size": "10", 

3589 "keep": False, 

3590 } 

3591 } 

3592 vdu_instantiation_volumes_list = [ 

3593 { 

3594 "vim-volume-id": vim_volume_id, 

3595 "name": "persistent-volume2", 

3596 } 

3597 ] 

3598 target_vdu = target_vdu_wth_persistent_storage 

3599 disk_list = [ 

3600 { 

3601 "image_id": "ubuntu20.04", 

3602 "size": "10", 

3603 "keep": False, 

3604 }, 

3605 ] 

3606 expected_disk_list = [ 

3607 { 

3608 "image_id": "ubuntu20.04", 

3609 "size": "10", 

3610 "keep": False, 

3611 }, 

3612 { 

3613 "vim_volume_id": vim_volume_id, 

3614 }, 

3615 ] 

3616 self.ns.find_persistent_volumes( 

3617 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3618 ) 

3619 self.assertEqual(disk_list, expected_disk_list) 

3620 mock_volume_keeping_required.assert_not_called() 

3621 

3622 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3623 def test_find_persistent_volumes_vdu_wthout_persistent_storage( 

3624 self, mock_volume_keeping_required 

3625 ): 

3626 """Find persistent ordinary volume, there is not any persistent disk.""" 

3627 persistent_root_disk = {} 

3628 vdu_instantiation_volumes_list = [] 

3629 mock_volume_keeping_required.return_value = False 

3630 target_vdu = target_vdu_wthout_persistent_storage 

3631 disk_list = [] 

3632 self.ns.find_persistent_volumes( 

3633 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3634 ) 

3635 self.assertEqual(disk_list, disk_list) 

3636 mock_volume_keeping_required.assert_not_called() 

3637 

3638 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3639 def test_find_persistent_volumes_vdu_wth_persistent_root_disk_wthout_ordinary_disk( 

3640 self, mock_volume_keeping_required 

3641 ): 

3642 """There is persistent root disk, but there is not ordinary persistent disk.""" 

3643 persistent_root_disk = { 

3644 "persistent-root-volume": { 

3645 "image_id": "ubuntu20.04", 

3646 "size": "10", 

3647 "keep": False, 

3648 } 

3649 } 

3650 vdu_instantiation_volumes_list = [] 

3651 mock_volume_keeping_required.return_value = False 

3652 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3653 target_vdu["virtual-storages"] = [ 

3654 { 

3655 "id": "persistent-root-volume", 

3656 "size-of-storage": "10", 

3657 "type-of-storage": "persistent-storage:persistent-storage", 

3658 "vdu-storage-requirements": [ 

3659 {"key": "keep-volume", "value": "true"}, 

3660 ], 

3661 }, 

3662 { 

3663 "id": "ephemeral-volume", 

3664 "size-of-storage": "1", 

3665 "type-of-storage": "etsi-nfv-descriptors:ephemeral-storage", 

3666 }, 

3667 ] 

3668 disk_list = [ 

3669 { 

3670 "image_id": "ubuntu20.04", 

3671 "size": "10", 

3672 "keep": False, 

3673 }, 

3674 ] 

3675 self.ns.find_persistent_volumes( 

3676 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3677 ) 

3678 self.assertEqual(disk_list, disk_list) 

3679 mock_volume_keeping_required.assert_not_called() 

3680 

3681 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

3682 def test_find_persistent_volumes_wth_inst_vol_list_disk_id_mismatch( 

3683 self, mock_volume_keeping_required 

3684 ): 

3685 """Find persistent ordinary volume, volume id is not persistent_root_disk dict, 

3686 vim-volume-id is given as instantiation parameter but disk id is not matching. 

3687 """ 

3688 mock_volume_keeping_required.return_value = True 

3689 vdu_instantiation_volumes_list = [ 

3690 { 

3691 "vim-volume-id": vim_volume_id, 

3692 "name": "persistent-volume3", 

3693 } 

3694 ] 

3695 persistent_root_disk = { 

3696 "persistent-root-volume": { 

3697 "image_id": "ubuntu20.04", 

3698 "size": "10", 

3699 "keep": False, 

3700 } 

3701 } 

3702 disk_list = [ 

3703 { 

3704 "image_id": "ubuntu20.04", 

3705 "size": "10", 

3706 "keep": False, 

3707 }, 

3708 ] 

3709 expected_disk_list = [ 

3710 { 

3711 "image_id": "ubuntu20.04", 

3712 "size": "10", 

3713 "keep": False, 

3714 }, 

3715 { 

3716 "size": "10", 

3717 "keep": True, 

3718 }, 

3719 ] 

3720 expected_disk = { 

3721 "id": "persistent-volume2", 

3722 "size-of-storage": "10", 

3723 "type-of-storage": "persistent-storage:persistent-storage", 

3724 } 

3725 target_vdu = target_vdu_wth_persistent_storage 

3726 self.ns.find_persistent_volumes( 

3727 persistent_root_disk, target_vdu, vdu_instantiation_volumes_list, disk_list 

3728 ) 

3729 self.assertEqual(disk_list, expected_disk_list) 

3730 mock_volume_keeping_required.assert_called_once_with(expected_disk) 

3731 

3732 def test_is_volume_keeping_required_true(self): 

3733 """Volume keeping is required.""" 

3734 virtual_storage_descriptor = { 

3735 "id": "persistent-root-volume", 

3736 "type-of-storage": "persistent-storage:persistent-storage", 

3737 "size-of-storage": "10", 

3738 "vdu-storage-requirements": [ 

3739 {"key": "keep-volume", "value": "true"}, 

3740 ], 

3741 } 

3742 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3743 self.assertEqual(result, True) 

3744 

3745 def test_is_volume_keeping_required_false(self): 

3746 """Volume keeping is not required.""" 

3747 virtual_storage_descriptor = { 

3748 "id": "persistent-root-volume", 

3749 "type-of-storage": "persistent-storage:persistent-storage", 

3750 "size-of-storage": "10", 

3751 "vdu-storage-requirements": [ 

3752 {"key": "keep-volume", "value": "false"}, 

3753 ], 

3754 } 

3755 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3756 self.assertEqual(result, False) 

3757 

3758 def test_is_volume_keeping_required_wthout_vdu_storage_reqirement(self): 

3759 """Volume keeping is not required, vdu-storage-requirements key does not exist.""" 

3760 virtual_storage_descriptor = { 

3761 "id": "persistent-root-volume", 

3762 "type-of-storage": "persistent-storage:persistent-storage", 

3763 "size-of-storage": "10", 

3764 } 

3765 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3766 self.assertEqual(result, False) 

3767 

3768 def test_is_volume_keeping_required_wrong_keyword(self): 

3769 """vdu-storage-requirements key to indicate keeping-volume is wrong.""" 

3770 virtual_storage_descriptor = { 

3771 "id": "persistent-root-volume", 

3772 "type-of-storage": "persistent-storage:persistent-storage", 

3773 "size-of-storage": "10", 

3774 "vdu-storage-requirements": [ 

3775 {"key": "hold-volume", "value": "true"}, 

3776 ], 

3777 } 

3778 result = self.ns.is_volume_keeping_required(virtual_storage_descriptor) 

3779 self.assertEqual(result, False) 

3780 

3781 def test_sort_vdu_interfaces_position_all_wth_positions(self): 

3782 """Interfaces are sorted according to position, all have positions.""" 

3783 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3784 target_vdu["interfaces"] = [ 

3785 { 

3786 "name": "vdu-eth1", 

3787 "ns-vld-id": "datanet", 

3788 "position": 2, 

3789 }, 

3790 { 

3791 "name": "vdu-eth0", 

3792 "ns-vld-id": "mgmtnet", 

3793 "position": 1, 

3794 }, 

3795 ] 

3796 sorted_interfaces = [ 

3797 { 

3798 "name": "vdu-eth0", 

3799 "ns-vld-id": "mgmtnet", 

3800 "position": 1, 

3801 }, 

3802 { 

3803 "name": "vdu-eth1", 

3804 "ns-vld-id": "datanet", 

3805 "position": 2, 

3806 }, 

3807 ] 

3808 self.ns._sort_vdu_interfaces(target_vdu) 

3809 self.assertEqual(target_vdu["interfaces"], sorted_interfaces) 

3810 

3811 def test_sort_vdu_interfaces_position_some_wth_position(self): 

3812 """Interfaces are sorted according to position, some of them have positions.""" 

3813 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3814 target_vdu["interfaces"] = [ 

3815 { 

3816 "name": "vdu-eth0", 

3817 "ns-vld-id": "mgmtnet", 

3818 }, 

3819 { 

3820 "name": "vdu-eth1", 

3821 "ns-vld-id": "datanet", 

3822 "position": 1, 

3823 }, 

3824 ] 

3825 sorted_interfaces = [ 

3826 { 

3827 "name": "vdu-eth1", 

3828 "ns-vld-id": "datanet", 

3829 "position": 1, 

3830 }, 

3831 { 

3832 "name": "vdu-eth0", 

3833 "ns-vld-id": "mgmtnet", 

3834 }, 

3835 ] 

3836 self.ns._sort_vdu_interfaces(target_vdu) 

3837 self.assertEqual(target_vdu["interfaces"], sorted_interfaces) 

3838 

3839 def test_sort_vdu_interfaces_position_empty_interface_list(self): 

3840 """Interface list is empty.""" 

3841 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3842 target_vdu["interfaces"] = [] 

3843 sorted_interfaces = [] 

3844 self.ns._sort_vdu_interfaces(target_vdu) 

3845 self.assertEqual(target_vdu["interfaces"], sorted_interfaces) 

3846 

3847 def test_partially_locate_vdu_interfaces(self): 

3848 """Some interfaces have positions.""" 

3849 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3850 target_vdu["interfaces"] = [ 

3851 { 

3852 "name": "vdu-eth1", 

3853 "ns-vld-id": "net1", 

3854 }, 

3855 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3856 { 

3857 "name": "vdu-eth3", 

3858 "ns-vld-id": "mgmtnet", 

3859 }, 

3860 { 

3861 "name": "vdu-eth1", 

3862 "ns-vld-id": "datanet", 

3863 "position": 1, 

3864 }, 

3865 ] 

3866 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3867 self.assertDictEqual( 

3868 target_vdu["interfaces"][0], 

3869 { 

3870 "name": "vdu-eth1", 

3871 "ns-vld-id": "datanet", 

3872 "position": 1, 

3873 }, 

3874 ) 

3875 self.assertDictEqual( 

3876 target_vdu["interfaces"][2], 

3877 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3878 ) 

3879 

3880 def test_partially_locate_vdu_interfaces_position_start_from_0(self): 

3881 """Some interfaces have positions, position start from 0.""" 

3882 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3883 target_vdu["interfaces"] = [ 

3884 { 

3885 "name": "vdu-eth1", 

3886 "ns-vld-id": "net1", 

3887 }, 

3888 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3889 { 

3890 "name": "vdu-eth3", 

3891 "ns-vld-id": "mgmtnet", 

3892 }, 

3893 { 

3894 "name": "vdu-eth1", 

3895 "ns-vld-id": "datanet", 

3896 "position": 0, 

3897 }, 

3898 ] 

3899 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3900 self.assertDictEqual( 

3901 target_vdu["interfaces"][0], 

3902 { 

3903 "name": "vdu-eth1", 

3904 "ns-vld-id": "datanet", 

3905 "position": 0, 

3906 }, 

3907 ) 

3908 self.assertDictEqual( 

3909 target_vdu["interfaces"][3], 

3910 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 3}, 

3911 ) 

3912 

3913 def test_partially_locate_vdu_interfaces_wthout_position(self): 

3914 """Interfaces do not have positions.""" 

3915 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3916 target_vdu["interfaces"] = interfaces_wthout_positions 

3917 expected_result = deepcopy(target_vdu["interfaces"]) 

3918 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3919 self.assertEqual(target_vdu["interfaces"], expected_result) 

3920 

3921 def test_partially_locate_vdu_interfaces_all_has_position(self): 

3922 """All interfaces have position.""" 

3923 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3924 target_vdu["interfaces"] = interfaces_wth_all_positions 

3925 expected_interfaces = [ 

3926 { 

3927 "name": "vdu-eth2", 

3928 "ns-vld-id": "net2", 

3929 "position": 0, 

3930 }, 

3931 { 

3932 "name": "vdu-eth3", 

3933 "ns-vld-id": "mgmtnet", 

3934 "position": 1, 

3935 }, 

3936 { 

3937 "name": "vdu-eth1", 

3938 "ns-vld-id": "net1", 

3939 "position": 2, 

3940 }, 

3941 ] 

3942 self.ns._partially_locate_vdu_interfaces(target_vdu) 

3943 self.assertEqual(target_vdu["interfaces"], expected_interfaces) 

3944 

3945 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

3946 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

3947 def test_prepare_vdu_cloud_init(self, mock_parse_jinja2, mock_get_cloud_init): 

3948 """Target_vdu has cloud-init and boot-data-drive.""" 

3949 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3950 target_vdu["cloud-init"] = "sample-cloud-init-path" 

3951 target_vdu["boot-data-drive"] = "vda" 

3952 vdu2cloud_init = {} 

3953 mock_get_cloud_init.return_value = cloud_init_content 

3954 mock_parse_jinja2.return_value = user_data 

3955 expected_result = { 

3956 "user-data": user_data, 

3957 "boot-data-drive": "vda", 

3958 } 

3959 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

3960 self.assertDictEqual(result, expected_result) 

3961 mock_get_cloud_init.assert_called_once_with( 

3962 db=db, fs=fs, location="sample-cloud-init-path" 

3963 ) 

3964 mock_parse_jinja2.assert_called_once_with( 

3965 cloud_init_content=cloud_init_content, 

3966 params=None, 

3967 context="sample-cloud-init-path", 

3968 ) 

3969 

3970 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

3971 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

3972 def test_prepare_vdu_cloud_init_get_cloud_init_raise_exception( 

3973 self, mock_parse_jinja2, mock_get_cloud_init 

3974 ): 

3975 """Target_vdu has cloud-init and boot-data-drive, get_cloud_init method raises exception.""" 

3976 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

3977 target_vdu["cloud-init"] = "sample-cloud-init-path" 

3978 target_vdu["boot-data-drive"] = "vda" 

3979 vdu2cloud_init = {} 

3980 mock_get_cloud_init.side_effect = NsException( 

3981 "Mismatch descriptor for cloud init." 

3982 ) 

3983 

3984 with self.assertRaises(NsException) as err: 

3985 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

3986 self.assertEqual(str(err.exception), "Mismatch descriptor for cloud init.") 

3987 

3988 mock_get_cloud_init.assert_called_once_with( 

3989 db=db, fs=fs, location="sample-cloud-init-path" 

3990 ) 

3991 mock_parse_jinja2.assert_not_called() 

3992 

3993 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

3994 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

3995 def test_prepare_vdu_cloud_init_parse_jinja2_raise_exception( 

3996 self, mock_parse_jinja2, mock_get_cloud_init 

3997 ): 

3998 """Target_vdu has cloud-init and boot-data-drive, parse_jinja2 method raises exception.""" 

3999 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4000 target_vdu["cloud-init"] = "sample-cloud-init-path" 

4001 target_vdu["boot-data-drive"] = "vda" 

4002 vdu2cloud_init = {} 

4003 mock_get_cloud_init.return_value = cloud_init_content 

4004 mock_parse_jinja2.side_effect = NsException("Error parsing cloud-init content.") 

4005 

4006 with self.assertRaises(NsException) as err: 

4007 self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4008 self.assertEqual(str(err.exception), "Error parsing cloud-init content.") 

4009 mock_get_cloud_init.assert_called_once_with( 

4010 db=db, fs=fs, location="sample-cloud-init-path" 

4011 ) 

4012 mock_parse_jinja2.assert_called_once_with( 

4013 cloud_init_content=cloud_init_content, 

4014 params=None, 

4015 context="sample-cloud-init-path", 

4016 ) 

4017 

4018 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

4019 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

4020 def test_prepare_vdu_cloud_init_vdu_wthout_boot_data_drive( 

4021 self, mock_parse_jinja2, mock_get_cloud_init 

4022 ): 

4023 """Target_vdu has cloud-init but do not have boot-data-drive.""" 

4024 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4025 target_vdu["cloud-init"] = "sample-cloud-init-path" 

4026 vdu2cloud_init = {} 

4027 mock_get_cloud_init.return_value = cloud_init_content 

4028 mock_parse_jinja2.return_value = user_data 

4029 expected_result = { 

4030 "user-data": user_data, 

4031 } 

4032 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4033 self.assertDictEqual(result, expected_result) 

4034 mock_get_cloud_init.assert_called_once_with( 

4035 db=db, fs=fs, location="sample-cloud-init-path" 

4036 ) 

4037 mock_parse_jinja2.assert_called_once_with( 

4038 cloud_init_content=cloud_init_content, 

4039 params=None, 

4040 context="sample-cloud-init-path", 

4041 ) 

4042 

4043 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

4044 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

4045 def test_prepare_vdu_cloud_init_exists_in_vdu2cloud_init( 

4046 self, mock_parse_jinja2, mock_get_cloud_init 

4047 ): 

4048 """Target_vdu has cloud-init, vdu2cloud_init dict has cloud-init_content.""" 

4049 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4050 target_vdu["cloud-init"] = "sample-cloud-init-path" 

4051 target_vdu["boot-data-drive"] = "vda" 

4052 vdu2cloud_init = {"sample-cloud-init-path": cloud_init_content} 

4053 mock_parse_jinja2.return_value = user_data 

4054 expected_result = { 

4055 "user-data": user_data, 

4056 "boot-data-drive": "vda", 

4057 } 

4058 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4059 self.assertDictEqual(result, expected_result) 

4060 mock_get_cloud_init.assert_not_called() 

4061 mock_parse_jinja2.assert_called_once_with( 

4062 cloud_init_content=cloud_init_content, 

4063 params=None, 

4064 context="sample-cloud-init-path", 

4065 ) 

4066 

4067 @patch("osm_ng_ro.ns.Ns._get_cloud_init") 

4068 @patch("osm_ng_ro.ns.Ns._parse_jinja2") 

4069 def test_prepare_vdu_cloud_init_no_cloud_init( 

4070 self, mock_parse_jinja2, mock_get_cloud_init 

4071 ): 

4072 """Target_vdu do not have cloud-init.""" 

4073 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4074 target_vdu["boot-data-drive"] = "vda" 

4075 vdu2cloud_init = {} 

4076 expected_result = { 

4077 "boot-data-drive": "vda", 

4078 } 

4079 result = self.ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs) 

4080 self.assertDictEqual(result, expected_result) 

4081 mock_get_cloud_init.assert_not_called() 

4082 mock_parse_jinja2.assert_not_called() 

4083 

4084 def test_check_vld_information_of_interfaces_ns_vld_vnf_vld_both_exist(self): 

4085 """ns_vld and vnf_vld both exist.""" 

4086 interface = { 

4087 "name": "vdu-eth0", 

4088 "ns-vld-id": "mgmtnet", 

4089 "vnf-vld-id": "mgmt_cp_int", 

4090 } 

4091 expected_result = f"{ns_preffix}:vld.mgmtnet" 

4092 result = self.ns._check_vld_information_of_interfaces( 

4093 interface, ns_preffix, vnf_preffix 

4094 ) 

4095 self.assertEqual(result, expected_result) 

4096 

4097 def test_check_vld_information_of_interfaces_empty_interfaces(self): 

4098 """Interface dict is empty.""" 

4099 interface = {} 

4100 result = self.ns._check_vld_information_of_interfaces( 

4101 interface, ns_preffix, vnf_preffix 

4102 ) 

4103 self.assertEqual(result, "") 

4104 

4105 def test_check_vld_information_of_interfaces_has_only_vnf_vld(self): 

4106 """Interface dict has only vnf_vld.""" 

4107 interface = { 

4108 "name": "vdu-eth0", 

4109 "vnf-vld-id": "mgmt_cp_int", 

4110 } 

4111 expected_result = f"{vnf_preffix}:vld.mgmt_cp_int" 

4112 result = self.ns._check_vld_information_of_interfaces( 

4113 interface, ns_preffix, vnf_preffix 

4114 ) 

4115 self.assertEqual(result, expected_result) 

4116 

4117 def test_check_vld_information_of_interfaces_has_vnf_vld_wthout_vnf_prefix( 

4118 self, 

4119 ): 

4120 """Interface dict has only vnf_vld but vnf_preffix does not exist.""" 

4121 interface = { 

4122 "name": "vdu-eth0", 

4123 "vnf-vld-id": "mgmt_cp_int", 

4124 } 

4125 vnf_preffix = None 

4126 with self.assertRaises(Exception) as err: 

4127 self.ns._check_vld_information_of_interfaces( 

4128 interface, ns_preffix, vnf_preffix 

4129 ) 

4130 self.assertEqual(type(err), TypeError) 

4131 

4132 def test_prepare_interface_port_security_has_security_details(self): 

4133 """Interface dict has port security details.""" 

4134 interface = { 

4135 "name": "vdu-eth0", 

4136 "ns-vld-id": "mgmtnet", 

4137 "vnf-vld-id": "mgmt_cp_int", 

4138 "port-security-enabled": True, 

4139 "port-security-disable-strategy": "allow-address-pairs", 

4140 } 

4141 expected_interface = { 

4142 "name": "vdu-eth0", 

4143 "ns-vld-id": "mgmtnet", 

4144 "vnf-vld-id": "mgmt_cp_int", 

4145 "port_security": True, 

4146 "port_security_disable_strategy": "allow-address-pairs", 

4147 } 

4148 self.ns._prepare_interface_port_security(interface) 

4149 self.assertDictEqual(interface, expected_interface) 

4150 

4151 def test_prepare_interface_port_security_empty_interfaces(self): 

4152 """Interface dict is empty.""" 

4153 interface = {} 

4154 expected_interface = {} 

4155 self.ns._prepare_interface_port_security(interface) 

4156 self.assertDictEqual(interface, expected_interface) 

4157 

4158 def test_prepare_interface_port_security_wthout_port_security(self): 

4159 """Interface dict does not have port security details.""" 

4160 interface = { 

4161 "name": "vdu-eth0", 

4162 "ns-vld-id": "mgmtnet", 

4163 "vnf-vld-id": "mgmt_cp_int", 

4164 } 

4165 expected_interface = { 

4166 "name": "vdu-eth0", 

4167 "ns-vld-id": "mgmtnet", 

4168 "vnf-vld-id": "mgmt_cp_int", 

4169 } 

4170 self.ns._prepare_interface_port_security(interface) 

4171 self.assertDictEqual(interface, expected_interface) 

4172 

4173 def test_create_net_item_of_interface_floating_ip_port_security(self): 

4174 """Interface dict has floating ip, port-security details.""" 

4175 interface = { 

4176 "name": "vdu-eth0", 

4177 "vcpi": "sample_vcpi", 

4178 "port_security": True, 

4179 "port_security_disable_strategy": "allow-address-pairs", 

4180 "floating_ip": "10.1.1.12", 

4181 "ns-vld-id": "mgmtnet", 

4182 "vnf-vld-id": "mgmt_cp_int", 

4183 } 

4184 net_text = f"{ns_preffix}" 

4185 expected_net_item = { 

4186 "name": "vdu-eth0", 

4187 "port_security": True, 

4188 "port_security_disable_strategy": "allow-address-pairs", 

4189 "floating_ip": "10.1.1.12", 

4190 "net_id": f"TASK-{ns_preffix}", 

4191 "type": "virtual", 

4192 } 

4193 result = self.ns._create_net_item_of_interface(interface, net_text) 

4194 self.assertDictEqual(result, expected_net_item) 

4195 

4196 def test_create_net_item_of_interface_invalid_net_text(self): 

4197 """net-text is invalid.""" 

4198 interface = { 

4199 "name": "vdu-eth0", 

4200 "vcpi": "sample_vcpi", 

4201 "port_security": True, 

4202 "port_security_disable_strategy": "allow-address-pairs", 

4203 "floating_ip": "10.1.1.12", 

4204 "ns-vld-id": "mgmtnet", 

4205 "vnf-vld-id": "mgmt_cp_int", 

4206 } 

4207 net_text = None 

4208 with self.assertRaises(TypeError): 

4209 self.ns._create_net_item_of_interface(interface, net_text) 

4210 

4211 def test_create_net_item_of_interface_empty_interface(self): 

4212 """Interface dict is empty.""" 

4213 interface = {} 

4214 net_text = ns_preffix 

4215 expected_net_item = { 

4216 "net_id": f"TASK-{ns_preffix}", 

4217 "type": "virtual", 

4218 } 

4219 result = self.ns._create_net_item_of_interface(interface, net_text) 

4220 self.assertDictEqual(result, expected_net_item) 

4221 

4222 @patch("osm_ng_ro.ns.deep_get") 

4223 def test_prepare_type_of_interface_type_sriov(self, mock_deep_get): 

4224 """Interface type is SR-IOV.""" 

4225 interface = { 

4226 "name": "vdu-eth0", 

4227 "vcpi": "sample_vcpi", 

4228 "port_security": True, 

4229 "port_security_disable_strategy": "allow-address-pairs", 

4230 "floating_ip": "10.1.1.12", 

4231 "ns-vld-id": "mgmtnet", 

4232 "vnf-vld-id": "mgmt_cp_int", 

4233 "type": "SR-IOV", 

4234 } 

4235 mock_deep_get.return_value = "SR-IOV" 

4236 net_text = ns_preffix 

4237 net_item = {} 

4238 expected_net_item = { 

4239 "use": "data", 

4240 "model": "SR-IOV", 

4241 "type": "SR-IOV", 

4242 } 

4243 self.ns._prepare_type_of_interface( 

4244 interface, tasks_by_target_record_id, net_text, net_item 

4245 ) 

4246 self.assertDictEqual(net_item, expected_net_item) 

4247 self.assertEqual( 

4248 "data", 

4249 tasks_by_target_record_id[net_text]["extra_dict"]["params"]["net_type"], 

4250 ) 

4251 mock_deep_get.assert_called_once_with( 

4252 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type" 

4253 ) 

4254 

4255 @patch("osm_ng_ro.ns.deep_get") 

4256 def test_prepare_type_of_interface_type_pic_passthrough_deep_get_return_empty_dict( 

4257 self, mock_deep_get 

4258 ): 

4259 """Interface type is PCI-PASSTHROUGH, deep_get method return empty dict.""" 

4260 interface = { 

4261 "name": "vdu-eth0", 

4262 "vcpi": "sample_vcpi", 

4263 "port_security": True, 

4264 "port_security_disable_strategy": "allow-address-pairs", 

4265 "floating_ip": "10.1.1.12", 

4266 "ns-vld-id": "mgmtnet", 

4267 "vnf-vld-id": "mgmt_cp_int", 

4268 "type": "PCI-PASSTHROUGH", 

4269 } 

4270 mock_deep_get.return_value = {} 

4271 tasks_by_target_record_id = {} 

4272 net_text = ns_preffix 

4273 net_item = {} 

4274 expected_net_item = { 

4275 "use": "data", 

4276 "model": "PCI-PASSTHROUGH", 

4277 "type": "PCI-PASSTHROUGH", 

4278 } 

4279 self.ns._prepare_type_of_interface( 

4280 interface, tasks_by_target_record_id, net_text, net_item 

4281 ) 

4282 self.assertDictEqual(net_item, expected_net_item) 

4283 mock_deep_get.assert_called_once_with( 

4284 tasks_by_target_record_id, net_text, "extra_dict", "params", "net_type" 

4285 ) 

4286 

4287 @patch("osm_ng_ro.ns.deep_get") 

4288 def test_prepare_type_of_interface_type_mgmt(self, mock_deep_get): 

4289 """Interface type is mgmt.""" 

4290 interface = { 

4291 "name": "vdu-eth0", 

4292 "vcpi": "sample_vcpi", 

4293 "port_security": True, 

4294 "port_security_disable_strategy": "allow-address-pairs", 

4295 "floating_ip": "10.1.1.12", 

4296 "ns-vld-id": "mgmtnet", 

4297 "vnf-vld-id": "mgmt_cp_int", 

4298 "type": "OM-MGMT", 

4299 } 

4300 tasks_by_target_record_id = {} 

4301 net_text = ns_preffix 

4302 net_item = {} 

4303 expected_net_item = { 

4304 "use": "mgmt", 

4305 } 

4306 self.ns._prepare_type_of_interface( 

4307 interface, tasks_by_target_record_id, net_text, net_item 

4308 ) 

4309 self.assertDictEqual(net_item, expected_net_item) 

4310 mock_deep_get.assert_not_called() 

4311 

4312 @patch("osm_ng_ro.ns.deep_get") 

4313 def test_prepare_type_of_interface_type_bridge(self, mock_deep_get): 

4314 """Interface type is bridge.""" 

4315 interface = { 

4316 "name": "vdu-eth0", 

4317 "vcpi": "sample_vcpi", 

4318 "port_security": True, 

4319 "port_security_disable_strategy": "allow-address-pairs", 

4320 "floating_ip": "10.1.1.12", 

4321 "ns-vld-id": "mgmtnet", 

4322 "vnf-vld-id": "mgmt_cp_int", 

4323 } 

4324 tasks_by_target_record_id = {} 

4325 net_text = ns_preffix 

4326 net_item = {} 

4327 expected_net_item = { 

4328 "use": "bridge", 

4329 "model": None, 

4330 } 

4331 self.ns._prepare_type_of_interface( 

4332 interface, tasks_by_target_record_id, net_text, net_item 

4333 ) 

4334 self.assertDictEqual(net_item, expected_net_item) 

4335 mock_deep_get.assert_not_called() 

4336 

4337 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4338 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4339 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4340 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4341 def test_prepare_vdu_interfaces( 

4342 self, 

4343 mock_type_of_interface, 

4344 mock_item_of_interface, 

4345 mock_port_security, 

4346 mock_vld_information_of_interface, 

4347 ): 

4348 """Prepare vdu interfaces successfully.""" 

4349 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4350 interface_1 = { 

4351 "name": "vdu-eth1", 

4352 "ns-vld-id": "net1", 

4353 "ip-address": "13.2.12.31", 

4354 "mgmt-interface": True, 

4355 } 

4356 interface_2 = { 

4357 "name": "vdu-eth2", 

4358 "vnf-vld-id": "net2", 

4359 "mac-address": "d0:94:66:ed:fc:e2", 

4360 } 

4361 interface_3 = { 

4362 "name": "vdu-eth3", 

4363 "ns-vld-id": "mgmtnet", 

4364 } 

4365 target_vdu["interfaces"] = [interface_1, interface_2, interface_3] 

4366 extra_dict = { 

4367 "params": "test_params", 

4368 "find_params": "test_find_params", 

4369 "depends_on": [], 

4370 } 

4371 

4372 net_text_1 = f"{ns_preffix}:net1" 

4373 net_text_2 = f"{vnf_preffix}:net2" 

4374 net_text_3 = f"{ns_preffix}:mgmtnet" 

4375 net_item_1 = { 

4376 "name": "vdu-eth1", 

4377 "net_id": f"TASK-{ns_preffix}", 

4378 "type": "virtual", 

4379 } 

4380 net_item_2 = { 

4381 "name": "vdu-eth2", 

4382 "net_id": f"TASK-{ns_preffix}", 

4383 "type": "virtual", 

4384 } 

4385 net_item_3 = { 

4386 "name": "vdu-eth3", 

4387 "net_id": f"TASK-{ns_preffix}", 

4388 "type": "virtual", 

4389 } 

4390 mock_item_of_interface.side_effect = [net_item_1, net_item_2, net_item_3] 

4391 mock_vld_information_of_interface.side_effect = [ 

4392 net_text_1, 

4393 net_text_2, 

4394 net_text_3, 

4395 ] 

4396 net_list = [] 

4397 expected_extra_dict = { 

4398 "params": "test_params", 

4399 "find_params": "test_find_params", 

4400 "depends_on": [net_text_1, net_text_2, net_text_3], 

4401 "mgmt_vdu_interface": 0, 

4402 } 

4403 updated_net_item1 = deepcopy(net_item_1) 

4404 updated_net_item1.update({"ip_address": "13.2.12.31"}) 

4405 updated_net_item2 = deepcopy(net_item_2) 

4406 updated_net_item2.update({"mac_address": "d0:94:66:ed:fc:e2"}) 

4407 expected_net_list = [updated_net_item1, updated_net_item2, net_item_3] 

4408 self.ns._prepare_vdu_interfaces( 

4409 target_vdu, 

4410 extra_dict, 

4411 ns_preffix, 

4412 vnf_preffix, 

4413 self.logger, 

4414 tasks_by_target_record_id, 

4415 net_list, 

4416 ) 

4417 _call_mock_vld_information_of_interface = ( 

4418 mock_vld_information_of_interface.call_args_list 

4419 ) 

4420 self.assertEqual( 

4421 _call_mock_vld_information_of_interface[0][0], 

4422 (interface_1, ns_preffix, vnf_preffix), 

4423 ) 

4424 self.assertEqual( 

4425 _call_mock_vld_information_of_interface[1][0], 

4426 (interface_2, ns_preffix, vnf_preffix), 

4427 ) 

4428 self.assertEqual( 

4429 _call_mock_vld_information_of_interface[2][0], 

4430 (interface_3, ns_preffix, vnf_preffix), 

4431 ) 

4432 

4433 _call_mock_port_security = mock_port_security.call_args_list 

4434 self.assertEqual(_call_mock_port_security[0].args[0], interface_1) 

4435 self.assertEqual(_call_mock_port_security[1].args[0], interface_2) 

4436 self.assertEqual(_call_mock_port_security[2].args[0], interface_3) 

4437 

4438 _call_mock_item_of_interface = mock_item_of_interface.call_args_list 

4439 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1)) 

4440 self.assertEqual(_call_mock_item_of_interface[1][0], (interface_2, net_text_2)) 

4441 self.assertEqual(_call_mock_item_of_interface[2][0], (interface_3, net_text_3)) 

4442 

4443 _call_mock_type_of_interface = mock_type_of_interface.call_args_list 

4444 self.assertEqual( 

4445 _call_mock_type_of_interface[0][0], 

4446 (interface_1, tasks_by_target_record_id, net_text_1, net_item_1), 

4447 ) 

4448 self.assertEqual( 

4449 _call_mock_type_of_interface[1][0], 

4450 (interface_2, tasks_by_target_record_id, net_text_2, net_item_2), 

4451 ) 

4452 self.assertEqual( 

4453 _call_mock_type_of_interface[2][0], 

4454 (interface_3, tasks_by_target_record_id, net_text_3, net_item_3), 

4455 ) 

4456 self.assertEqual(net_list, expected_net_list) 

4457 self.assertEqual(extra_dict, expected_extra_dict) 

4458 self.logger.error.assert_not_called() 

4459 

4460 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4461 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4462 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4463 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4464 def test_prepare_vdu_interfaces_create_net_item_raise_exception( 

4465 self, 

4466 mock_type_of_interface, 

4467 mock_item_of_interface, 

4468 mock_port_security, 

4469 mock_vld_information_of_interface, 

4470 ): 

4471 """Prepare vdu interfaces, create_net_item_of_interface method raise exception.""" 

4472 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4473 interface_1 = { 

4474 "name": "vdu-eth1", 

4475 "ns-vld-id": "net1", 

4476 "ip-address": "13.2.12.31", 

4477 "mgmt-interface": True, 

4478 } 

4479 interface_2 = { 

4480 "name": "vdu-eth2", 

4481 "vnf-vld-id": "net2", 

4482 "mac-address": "d0:94:66:ed:fc:e2", 

4483 } 

4484 interface_3 = { 

4485 "name": "vdu-eth3", 

4486 "ns-vld-id": "mgmtnet", 

4487 } 

4488 target_vdu["interfaces"] = [interface_1, interface_2, interface_3] 

4489 extra_dict = { 

4490 "params": "test_params", 

4491 "find_params": "test_find_params", 

4492 "depends_on": [], 

4493 } 

4494 net_text_1 = f"{ns_preffix}:net1" 

4495 mock_item_of_interface.side_effect = [TypeError, TypeError, TypeError] 

4496 

4497 mock_vld_information_of_interface.side_effect = [net_text_1] 

4498 net_list = [] 

4499 expected_extra_dict = { 

4500 "params": "test_params", 

4501 "find_params": "test_find_params", 

4502 "depends_on": [net_text_1], 

4503 } 

4504 with self.assertRaises(TypeError): 

4505 self.ns._prepare_vdu_interfaces( 

4506 target_vdu, 

4507 extra_dict, 

4508 ns_preffix, 

4509 vnf_preffix, 

4510 self.logger, 

4511 tasks_by_target_record_id, 

4512 net_list, 

4513 ) 

4514 

4515 _call_mock_vld_information_of_interface = ( 

4516 mock_vld_information_of_interface.call_args_list 

4517 ) 

4518 self.assertEqual( 

4519 _call_mock_vld_information_of_interface[0][0], 

4520 (interface_1, ns_preffix, vnf_preffix), 

4521 ) 

4522 

4523 _call_mock_port_security = mock_port_security.call_args_list 

4524 self.assertEqual(_call_mock_port_security[0].args[0], interface_1) 

4525 

4526 _call_mock_item_of_interface = mock_item_of_interface.call_args_list 

4527 self.assertEqual(_call_mock_item_of_interface[0][0], (interface_1, net_text_1)) 

4528 

4529 mock_type_of_interface.assert_not_called() 

4530 self.logger.error.assert_not_called() 

4531 self.assertEqual(net_list, []) 

4532 self.assertEqual(extra_dict, expected_extra_dict) 

4533 

4534 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4535 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4536 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4537 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4538 def test_prepare_vdu_interfaces_vld_information_is_empty( 

4539 self, 

4540 mock_type_of_interface, 

4541 mock_item_of_interface, 

4542 mock_port_security, 

4543 mock_vld_information_of_interface, 

4544 ): 

4545 """Prepare vdu interfaces, check_vld_information_of_interface method returns empty result.""" 

4546 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4547 interface_1 = { 

4548 "name": "vdu-eth1", 

4549 "ns-vld-id": "net1", 

4550 "ip-address": "13.2.12.31", 

4551 "mgmt-interface": True, 

4552 } 

4553 interface_2 = { 

4554 "name": "vdu-eth2", 

4555 "vnf-vld-id": "net2", 

4556 "mac-address": "d0:94:66:ed:fc:e2", 

4557 } 

4558 interface_3 = { 

4559 "name": "vdu-eth3", 

4560 "ns-vld-id": "mgmtnet", 

4561 } 

4562 target_vdu["interfaces"] = [interface_1, interface_2, interface_3] 

4563 extra_dict = { 

4564 "params": "test_params", 

4565 "find_params": "test_find_params", 

4566 "depends_on": [], 

4567 } 

4568 mock_vld_information_of_interface.side_effect = ["", "", ""] 

4569 net_list = [] 

4570 self.ns._prepare_vdu_interfaces( 

4571 target_vdu, 

4572 extra_dict, 

4573 ns_preffix, 

4574 vnf_preffix, 

4575 self.logger, 

4576 tasks_by_target_record_id, 

4577 net_list, 

4578 ) 

4579 

4580 _call_mock_vld_information_of_interface = ( 

4581 mock_vld_information_of_interface.call_args_list 

4582 ) 

4583 self.assertEqual( 

4584 _call_mock_vld_information_of_interface[0][0], 

4585 (interface_1, ns_preffix, vnf_preffix), 

4586 ) 

4587 self.assertEqual( 

4588 _call_mock_vld_information_of_interface[1][0], 

4589 (interface_2, ns_preffix, vnf_preffix), 

4590 ) 

4591 self.assertEqual( 

4592 _call_mock_vld_information_of_interface[2][0], 

4593 (interface_3, ns_preffix, vnf_preffix), 

4594 ) 

4595 

4596 _call_logger = self.logger.error.call_args_list 

4597 self.assertEqual( 

4598 _call_logger[0][0], 

4599 ("Interface 0 from vdu several_volumes-VM not connected to any vld",), 

4600 ) 

4601 self.assertEqual( 

4602 _call_logger[1][0], 

4603 ("Interface 1 from vdu several_volumes-VM not connected to any vld",), 

4604 ) 

4605 self.assertEqual( 

4606 _call_logger[2][0], 

4607 ("Interface 2 from vdu several_volumes-VM not connected to any vld",), 

4608 ) 

4609 self.assertEqual(net_list, []) 

4610 self.assertEqual( 

4611 extra_dict, 

4612 { 

4613 "params": "test_params", 

4614 "find_params": "test_find_params", 

4615 "depends_on": [], 

4616 }, 

4617 ) 

4618 

4619 mock_item_of_interface.assert_not_called() 

4620 mock_port_security.assert_not_called() 

4621 mock_type_of_interface.assert_not_called() 

4622 

4623 @patch("osm_ng_ro.ns.Ns._check_vld_information_of_interfaces") 

4624 @patch("osm_ng_ro.ns.Ns._prepare_interface_port_security") 

4625 @patch("osm_ng_ro.ns.Ns._create_net_item_of_interface") 

4626 @patch("osm_ng_ro.ns.Ns._prepare_type_of_interface") 

4627 def test_prepare_vdu_interfaces_empty_interface_list( 

4628 self, 

4629 mock_type_of_interface, 

4630 mock_item_of_interface, 

4631 mock_port_security, 

4632 mock_vld_information_of_interface, 

4633 ): 

4634 """Prepare vdu interfaces, interface list is empty.""" 

4635 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4636 target_vdu["interfaces"] = [] 

4637 extra_dict = {} 

4638 net_list = [] 

4639 self.ns._prepare_vdu_interfaces( 

4640 target_vdu, 

4641 extra_dict, 

4642 ns_preffix, 

4643 vnf_preffix, 

4644 self.logger, 

4645 tasks_by_target_record_id, 

4646 net_list, 

4647 ) 

4648 mock_type_of_interface.assert_not_called() 

4649 mock_vld_information_of_interface.assert_not_called() 

4650 mock_item_of_interface.assert_not_called() 

4651 mock_port_security.assert_not_called() 

4652 

4653 def test_prepare_vdu_ssh_keys(self): 

4654 """Target_vdu has ssh-keys and ro_nsr_public_key exists.""" 

4655 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4656 target_vdu["ssh-keys"] = ["sample-ssh-key"] 

4657 ro_nsr_public_key = {"public_key": "path_of_public_key"} 

4658 target_vdu["ssh-access-required"] = True 

4659 cloud_config = {} 

4660 expected_cloud_config = { 

4661 "key-pairs": ["sample-ssh-key", {"public_key": "path_of_public_key"}] 

4662 } 

4663 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config) 

4664 self.assertDictEqual(cloud_config, expected_cloud_config) 

4665 

4666 def test_prepare_vdu_ssh_keys_target_vdu_wthout_ssh_keys(self): 

4667 """Target_vdu does not have ssh-keys.""" 

4668 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4669 ro_nsr_public_key = {"public_key": "path_of_public_key"} 

4670 target_vdu["ssh-access-required"] = True 

4671 cloud_config = {} 

4672 expected_cloud_config = {"key-pairs": [{"public_key": "path_of_public_key"}]} 

4673 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config) 

4674 self.assertDictEqual(cloud_config, expected_cloud_config) 

4675 

4676 def test_prepare_vdu_ssh_keys_ssh_access_is_not_required(self): 

4677 """Target_vdu has ssh-keys, ssh-access is not required.""" 

4678 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4679 target_vdu["ssh-keys"] = ["sample-ssh-key"] 

4680 ro_nsr_public_key = {"public_key": "path_of_public_key"} 

4681 target_vdu["ssh-access-required"] = False 

4682 cloud_config = {} 

4683 expected_cloud_config = {"key-pairs": ["sample-ssh-key"]} 

4684 self.ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config) 

4685 self.assertDictEqual(cloud_config, expected_cloud_config) 

4686 

4687 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4688 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4689 def test_add_persistent_root_disk_to_disk_list_keep_false( 

4690 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4691 ): 

4692 """Add persistent root disk to disk_list, keep volume set to False.""" 

4693 root_disk = { 

4694 "id": "persistent-root-volume", 

4695 "type-of-storage": "persistent-storage:persistent-storage", 

4696 "size-of-storage": "10", 

4697 } 

4698 mock_select_persistent_root_disk.return_value = root_disk 

4699 vnfd = deepcopy(vnfd_wth_persistent_storage) 

4700 vnfd["virtual-storage-desc"][1] = root_disk 

4701 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4702 persistent_root_disk = {} 

4703 disk_list = [] 

4704 mock_volume_keeping_required.return_value = False 

4705 expected_disk_list = [ 

4706 { 

4707 "image_id": "ubuntu20.04", 

4708 "size": "10", 

4709 "keep": False, 

4710 } 

4711 ] 

4712 self.ns._add_persistent_root_disk_to_disk_list( 

4713 vnfd, target_vdu, persistent_root_disk, disk_list 

4714 ) 

4715 self.assertEqual(disk_list, expected_disk_list) 

4716 mock_select_persistent_root_disk.assert_called_once() 

4717 mock_volume_keeping_required.assert_called_once() 

4718 

4719 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4720 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4721 def test_add_persistent_root_disk_to_disk_list_select_persistent_root_disk_raises( 

4722 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4723 ): 

4724 """Add persistent root disk to disk_list""" 

4725 root_disk = { 

4726 "id": "persistent-root-volume", 

4727 "type-of-storage": "persistent-storage:persistent-storage", 

4728 "size-of-storage": "10", 

4729 } 

4730 mock_select_persistent_root_disk.side_effect = AttributeError 

4731 vnfd = deepcopy(vnfd_wth_persistent_storage) 

4732 vnfd["virtual-storage-desc"][1] = root_disk 

4733 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4734 persistent_root_disk = {} 

4735 disk_list = [] 

4736 with self.assertRaises(AttributeError): 

4737 self.ns._add_persistent_root_disk_to_disk_list( 

4738 vnfd, target_vdu, persistent_root_disk, disk_list 

4739 ) 

4740 self.assertEqual(disk_list, []) 

4741 mock_select_persistent_root_disk.assert_called_once() 

4742 mock_volume_keeping_required.assert_not_called() 

4743 

4744 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4745 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4746 def test_add_persistent_root_disk_to_disk_list_keep_true( 

4747 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4748 ): 

4749 """Add persistent root disk, keeo volume set to True.""" 

4750 vnfd = deepcopy(vnfd_wth_persistent_storage) 

4751 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4752 mock_volume_keeping_required.return_value = True 

4753 root_disk = { 

4754 "id": "persistent-root-volume", 

4755 "type-of-storage": "persistent-storage:persistent-storage", 

4756 "size-of-storage": "10", 

4757 "vdu-storage-requirements": [ 

4758 {"key": "keep-volume", "value": "true"}, 

4759 ], 

4760 } 

4761 mock_select_persistent_root_disk.return_value = root_disk 

4762 persistent_root_disk = {} 

4763 disk_list = [] 

4764 expected_disk_list = [ 

4765 { 

4766 "image_id": "ubuntu20.04", 

4767 "size": "10", 

4768 "keep": True, 

4769 } 

4770 ] 

4771 self.ns._add_persistent_root_disk_to_disk_list( 

4772 vnfd, target_vdu, persistent_root_disk, disk_list 

4773 ) 

4774 self.assertEqual(disk_list, expected_disk_list) 

4775 mock_volume_keeping_required.assert_called_once_with(root_disk) 

4776 

4777 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4778 def test_add_persistent_ordinary_disk_to_disk_list( 

4779 self, mock_volume_keeping_required 

4780 ): 

4781 """Add persistent ordinary disk, keeo volume set to True.""" 

4782 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4783 mock_volume_keeping_required.return_value = False 

4784 persistent_root_disk = { 

4785 "persistent-root-volume": { 

4786 "image_id": "ubuntu20.04", 

4787 "size": "10", 

4788 "keep": True, 

4789 } 

4790 } 

4791 ordinary_disk = { 

4792 "id": "persistent-volume2", 

4793 "type-of-storage": "persistent-storage:persistent-storage", 

4794 "size-of-storage": "10", 

4795 } 

4796 persistent_ordinary_disk = {} 

4797 disk_list = [] 

4798 extra_dict = {} 

4799 expected_disk_list = [ 

4800 { 

4801 "size": "10", 

4802 "keep": False, 

4803 "multiattach": False, 

4804 "name": "persistent-volume2", 

4805 } 

4806 ] 

4807 self.ns._add_persistent_ordinary_disks_to_disk_list( 

4808 target_vdu, 

4809 persistent_root_disk, 

4810 persistent_ordinary_disk, 

4811 disk_list, 

4812 extra_dict, 

4813 ) 

4814 self.assertEqual(disk_list, expected_disk_list) 

4815 mock_volume_keeping_required.assert_called_once_with(ordinary_disk) 

4816 

4817 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4818 def test_add_persistent_ordinary_disk_to_disk_list_vsd_id_in_root_disk_dict( 

4819 self, mock_volume_keeping_required 

4820 ): 

4821 """Add persistent ordinary disk, vsd id is in root_disk dict.""" 

4822 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4823 mock_volume_keeping_required.return_value = False 

4824 persistent_root_disk = { 

4825 "persistent-root-volume": { 

4826 "image_id": "ubuntu20.04", 

4827 "size": "10", 

4828 "keep": True, 

4829 }, 

4830 "persistent-volume2": { 

4831 "size": "10", 

4832 }, 

4833 } 

4834 persistent_ordinary_disk = {} 

4835 disk_list = [] 

4836 extra_dict = {} 

4837 

4838 self.ns._add_persistent_ordinary_disks_to_disk_list( 

4839 target_vdu, 

4840 persistent_root_disk, 

4841 persistent_ordinary_disk, 

4842 disk_list, 

4843 extra_dict, 

4844 ) 

4845 self.assertEqual(disk_list, []) 

4846 mock_volume_keeping_required.assert_not_called() 

4847 

4848 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4849 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4850 def test_add_persistent_root_disk_to_disk_list_vnfd_wthout_persistent_storage( 

4851 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4852 ): 

4853 """VNFD does not have persistent storage.""" 

4854 vnfd = deepcopy(vnfd_wthout_persistent_storage) 

4855 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

4856 mock_select_persistent_root_disk.return_value = None 

4857 persistent_root_disk = {} 

4858 disk_list = [] 

4859 self.ns._add_persistent_root_disk_to_disk_list( 

4860 vnfd, target_vdu, persistent_root_disk, disk_list 

4861 ) 

4862 self.assertEqual(disk_list, []) 

4863 self.assertEqual(mock_select_persistent_root_disk.call_count, 2) 

4864 mock_volume_keeping_required.assert_not_called() 

4865 

4866 @patch("osm_ng_ro.ns.Ns._select_persistent_root_disk") 

4867 @patch("osm_ng_ro.ns.Ns.is_volume_keeping_required") 

4868 def test_add_persistent_root_disk_to_disk_list_wthout_persistent_root_disk( 

4869 self, mock_volume_keeping_required, mock_select_persistent_root_disk 

4870 ): 

4871 """Persistent_root_disk dict is empty.""" 

4872 vnfd = deepcopy(vnfd_wthout_persistent_storage) 

4873 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

4874 mock_select_persistent_root_disk.return_value = None 

4875 persistent_root_disk = {} 

4876 disk_list = [] 

4877 self.ns._add_persistent_root_disk_to_disk_list( 

4878 vnfd, target_vdu, persistent_root_disk, disk_list 

4879 ) 

4880 self.assertEqual(disk_list, []) 

4881 self.assertEqual(mock_select_persistent_root_disk.call_count, 2) 

4882 mock_volume_keeping_required.assert_not_called() 

4883 

4884 def test_prepare_vdu_affinity_group_list_invalid_extra_dict(self): 

4885 """Invalid extra dict.""" 

4886 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4887 target_vdu["affinity-or-anti-affinity-group-id"] = "sample_affinity-group-id" 

4888 extra_dict = {} 

4889 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4890 with self.assertRaises(NsException) as err: 

4891 self.ns._prepare_vdu_affinity_group_list(target_vdu, extra_dict, ns_preffix) 

4892 self.assertEqual(str(err.exception), "Invalid extra_dict format.") 

4893 

4894 def test_prepare_vdu_affinity_group_list_one_affinity_group(self): 

4895 """There is one affinity-group.""" 

4896 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4897 target_vdu["affinity-or-anti-affinity-group-id"] = ["sample_affinity-group-id"] 

4898 extra_dict = {"depends_on": []} 

4899 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4900 affinity_group_txt = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.sample_affinity-group-id" 

4901 expected_result = [{"affinity_group_id": "TASK-" + affinity_group_txt}] 

4902 expected_extra_dict = {"depends_on": [affinity_group_txt]} 

4903 result = self.ns._prepare_vdu_affinity_group_list( 

4904 target_vdu, extra_dict, ns_preffix 

4905 ) 

4906 self.assertDictEqual(extra_dict, expected_extra_dict) 

4907 self.assertEqual(result, expected_result) 

4908 

4909 def test_prepare_vdu_affinity_group_list_several_affinity_groups(self): 

4910 """There are two affinity-groups.""" 

4911 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4912 target_vdu["affinity-or-anti-affinity-group-id"] = [ 

4913 "affinity-group-id1", 

4914 "affinity-group-id2", 

4915 ] 

4916 extra_dict = {"depends_on": []} 

4917 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4918 affinity_group_txt1 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id1" 

4919 affinity_group_txt2 = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3:affinity-or-anti-affinity-group.affinity-group-id2" 

4920 expected_result = [ 

4921 {"affinity_group_id": "TASK-" + affinity_group_txt1}, 

4922 {"affinity_group_id": "TASK-" + affinity_group_txt2}, 

4923 ] 

4924 expected_extra_dict = {"depends_on": [affinity_group_txt1, affinity_group_txt2]} 

4925 result = self.ns._prepare_vdu_affinity_group_list( 

4926 target_vdu, extra_dict, ns_preffix 

4927 ) 

4928 self.assertDictEqual(extra_dict, expected_extra_dict) 

4929 self.assertEqual(result, expected_result) 

4930 

4931 def test_prepare_vdu_affinity_group_list_no_affinity_group(self): 

4932 """There is not any affinity-group.""" 

4933 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4934 extra_dict = {"depends_on": []} 

4935 ns_preffix = "nsrs:th47f48-9870-4169-b758-9732e1ff40f3" 

4936 result = self.ns._prepare_vdu_affinity_group_list( 

4937 target_vdu, extra_dict, ns_preffix 

4938 ) 

4939 self.assertDictEqual(extra_dict, {"depends_on": []}) 

4940 self.assertEqual(result, []) 

4941 

4942 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

4943 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

4944 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

4945 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

4946 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

4947 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

4948 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

4949 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

4950 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

4951 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

4952 def test_process_vdu_params_with_inst_vol_list( 

4953 self, 

4954 mock_prepare_vdu_affinity_group_list, 

4955 mock_add_persistent_ordinary_disks_to_disk_list, 

4956 mock_add_persistent_root_disk_to_disk_list, 

4957 mock_find_persistent_volumes, 

4958 mock_find_persistent_root_volumes, 

4959 mock_prepare_vdu_ssh_keys, 

4960 mock_prepare_vdu_cloud_init, 

4961 mock_prepare_vdu_interfaces, 

4962 mock_locate_vdu_interfaces, 

4963 mock_sort_vdu_interfaces, 

4964 ): 

4965 """Instantiation volume list is empty.""" 

4966 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

4967 

4968 target_vdu["interfaces"] = interfaces_wth_all_positions 

4969 

4970 vdu_instantiation_vol_list = [ 

4971 { 

4972 "vim-volume-id": vim_volume_id, 

4973 "name": "persistent-volume2", 

4974 } 

4975 ] 

4976 target_vdu["additionalParams"] = { 

4977 "OSM": {"vdu_volumes": vdu_instantiation_vol_list} 

4978 } 

4979 mock_prepare_vdu_cloud_init.return_value = {} 

4980 mock_prepare_vdu_affinity_group_list.return_value = [] 

4981 persistent_root_disk = { 

4982 "persistent-root-volume": { 

4983 "image_id": "ubuntu20.04", 

4984 "size": "10", 

4985 } 

4986 } 

4987 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

4988 

4989 new_kwargs = deepcopy(kwargs) 

4990 new_kwargs.update( 

4991 { 

4992 "vnfr_id": vnfr_id, 

4993 "nsr_id": nsr_id, 

4994 "tasks_by_target_record_id": {}, 

4995 "logger": "logger", 

4996 } 

4997 ) 

4998 expected_extra_dict_copy = deepcopy(expected_extra_dict) 

4999 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5000 db.get_one.return_value = vnfd 

5001 result = Ns._process_vdu_params( 

5002 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5003 ) 

5004 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5005 mock_locate_vdu_interfaces.assert_not_called() 

5006 mock_prepare_vdu_cloud_init.assert_called_once() 

5007 mock_add_persistent_root_disk_to_disk_list.assert_not_called() 

5008 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called() 

5009 mock_prepare_vdu_interfaces.assert_called_once_with( 

5010 target_vdu, 

5011 expected_extra_dict_copy, 

5012 ns_preffix, 

5013 vnf_preffix, 

5014 "logger", 

5015 {}, 

5016 [], 

5017 ) 

5018 self.assertDictEqual(result, expected_extra_dict_copy) 

5019 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5020 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5021 mock_find_persistent_volumes.assert_called_once_with( 

5022 persistent_root_disk, target_vdu, vdu_instantiation_vol_list, [] 

5023 ) 

5024 

5025 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5026 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5027 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5028 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5029 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5030 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5031 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5032 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5033 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5034 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5035 def test_process_vdu_params_wth_affinity_groups( 

5036 self, 

5037 mock_prepare_vdu_affinity_group_list, 

5038 mock_add_persistent_ordinary_disks_to_disk_list, 

5039 mock_add_persistent_root_disk_to_disk_list, 

5040 mock_find_persistent_volumes, 

5041 mock_find_persistent_root_volumes, 

5042 mock_prepare_vdu_ssh_keys, 

5043 mock_prepare_vdu_cloud_init, 

5044 mock_prepare_vdu_interfaces, 

5045 mock_locate_vdu_interfaces, 

5046 mock_sort_vdu_interfaces, 

5047 ): 

5048 """There is cloud-config.""" 

5049 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

5050 

5051 self.maxDiff = None 

5052 target_vdu["interfaces"] = interfaces_wth_all_positions 

5053 mock_prepare_vdu_cloud_init.return_value = {} 

5054 mock_prepare_vdu_affinity_group_list.return_value = [ 

5055 "affinity_group_1", 

5056 "affinity_group_2", 

5057 ] 

5058 

5059 new_kwargs = deepcopy(kwargs) 

5060 new_kwargs.update( 

5061 { 

5062 "vnfr_id": vnfr_id, 

5063 "nsr_id": nsr_id, 

5064 "tasks_by_target_record_id": {}, 

5065 "logger": "logger", 

5066 } 

5067 ) 

5068 expected_extra_dict3 = deepcopy(expected_extra_dict2) 

5069 expected_extra_dict3["params"]["affinity_group_list"] = [ 

5070 "affinity_group_1", 

5071 "affinity_group_2", 

5072 ] 

5073 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5074 db.get_one.return_value = vnfd 

5075 result = Ns._process_vdu_params( 

5076 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5077 ) 

5078 self.assertDictEqual(result, expected_extra_dict3) 

5079 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5080 mock_locate_vdu_interfaces.assert_not_called() 

5081 mock_prepare_vdu_cloud_init.assert_called_once() 

5082 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5083 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5084 mock_prepare_vdu_interfaces.assert_called_once_with( 

5085 target_vdu, 

5086 expected_extra_dict3, 

5087 ns_preffix, 

5088 vnf_preffix, 

5089 "logger", 

5090 {}, 

5091 [], 

5092 ) 

5093 

5094 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5095 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5096 mock_find_persistent_volumes.assert_not_called() 

5097 

5098 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5099 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5100 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5101 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5102 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5103 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5104 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5105 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5106 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5107 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5108 def test_process_vdu_params_wth_cloud_config( 

5109 self, 

5110 mock_prepare_vdu_affinity_group_list, 

5111 mock_add_persistent_ordinary_disks_to_disk_list, 

5112 mock_add_persistent_root_disk_to_disk_list, 

5113 mock_find_persistent_volumes, 

5114 mock_find_persistent_root_volumes, 

5115 mock_prepare_vdu_ssh_keys, 

5116 mock_prepare_vdu_cloud_init, 

5117 mock_prepare_vdu_interfaces, 

5118 mock_locate_vdu_interfaces, 

5119 mock_sort_vdu_interfaces, 

5120 ): 

5121 """There is cloud-config.""" 

5122 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

5123 

5124 self.maxDiff = None 

5125 target_vdu["interfaces"] = interfaces_wth_all_positions 

5126 mock_prepare_vdu_cloud_init.return_value = { 

5127 "user-data": user_data, 

5128 "boot-data-drive": "vda", 

5129 } 

5130 mock_prepare_vdu_affinity_group_list.return_value = [] 

5131 

5132 new_kwargs = deepcopy(kwargs) 

5133 new_kwargs.update( 

5134 { 

5135 "vnfr_id": vnfr_id, 

5136 "nsr_id": nsr_id, 

5137 "tasks_by_target_record_id": {}, 

5138 "logger": "logger", 

5139 } 

5140 ) 

5141 expected_extra_dict3 = deepcopy(expected_extra_dict2) 

5142 expected_extra_dict3["params"]["cloud_config"] = { 

5143 "user-data": user_data, 

5144 "boot-data-drive": "vda", 

5145 } 

5146 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5147 db.get_one.return_value = vnfd 

5148 result = Ns._process_vdu_params( 

5149 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5150 ) 

5151 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5152 mock_locate_vdu_interfaces.assert_not_called() 

5153 mock_prepare_vdu_cloud_init.assert_called_once() 

5154 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5155 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5156 mock_prepare_vdu_interfaces.assert_called_once_with( 

5157 target_vdu, 

5158 expected_extra_dict3, 

5159 ns_preffix, 

5160 vnf_preffix, 

5161 "logger", 

5162 {}, 

5163 [], 

5164 ) 

5165 self.assertDictEqual(result, expected_extra_dict3) 

5166 mock_prepare_vdu_ssh_keys.assert_called_once_with( 

5167 target_vdu, None, {"user-data": user_data, "boot-data-drive": "vda"} 

5168 ) 

5169 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5170 mock_find_persistent_volumes.assert_not_called() 

5171 

5172 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5173 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5174 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5175 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5176 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5177 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5178 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5179 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5180 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5181 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5182 def test_process_vdu_params_wthout_persistent_storage( 

5183 self, 

5184 mock_prepare_vdu_affinity_group_list, 

5185 mock_add_persistent_ordinary_disks_to_disk_list, 

5186 mock_add_persistent_root_disk_to_disk_list, 

5187 mock_find_persistent_volumes, 

5188 mock_find_persistent_root_volumes, 

5189 mock_prepare_vdu_ssh_keys, 

5190 mock_prepare_vdu_cloud_init, 

5191 mock_prepare_vdu_interfaces, 

5192 mock_locate_vdu_interfaces, 

5193 mock_sort_vdu_interfaces, 

5194 ): 

5195 """There is not any persistent storage.""" 

5196 target_vdu = deepcopy(target_vdu_wthout_persistent_storage) 

5197 

5198 self.maxDiff = None 

5199 target_vdu["interfaces"] = interfaces_wth_all_positions 

5200 mock_prepare_vdu_cloud_init.return_value = {} 

5201 mock_prepare_vdu_affinity_group_list.return_value = [] 

5202 

5203 new_kwargs = deepcopy(kwargs) 

5204 new_kwargs.update( 

5205 { 

5206 "vnfr_id": vnfr_id, 

5207 "nsr_id": nsr_id, 

5208 "tasks_by_target_record_id": {}, 

5209 "logger": "logger", 

5210 } 

5211 ) 

5212 expected_extra_dict_copy = deepcopy(expected_extra_dict2) 

5213 vnfd = deepcopy(vnfd_wthout_persistent_storage) 

5214 db.get_one.return_value = vnfd 

5215 result = Ns._process_vdu_params( 

5216 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5217 ) 

5218 mock_sort_vdu_interfaces.assert_called_once_with(target_vdu) 

5219 mock_locate_vdu_interfaces.assert_not_called() 

5220 mock_prepare_vdu_cloud_init.assert_called_once() 

5221 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5222 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5223 mock_prepare_vdu_interfaces.assert_called_once_with( 

5224 target_vdu, 

5225 expected_extra_dict_copy, 

5226 ns_preffix, 

5227 vnf_preffix, 

5228 "logger", 

5229 {}, 

5230 [], 

5231 ) 

5232 self.assertDictEqual(result, expected_extra_dict_copy) 

5233 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5234 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5235 mock_find_persistent_volumes.assert_not_called() 

5236 

5237 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5238 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5239 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5240 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5241 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5242 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5243 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5244 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5245 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5246 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5247 def test_process_vdu_params_interfaces_partially_located( 

5248 self, 

5249 mock_prepare_vdu_affinity_group_list, 

5250 mock_add_persistent_ordinary_disks_to_disk_list, 

5251 mock_add_persistent_root_disk_to_disk_list, 

5252 mock_find_persistent_volumes, 

5253 mock_find_persistent_root_volumes, 

5254 mock_prepare_vdu_ssh_keys, 

5255 mock_prepare_vdu_cloud_init, 

5256 mock_prepare_vdu_interfaces, 

5257 mock_locate_vdu_interfaces, 

5258 mock_sort_vdu_interfaces, 

5259 ): 

5260 """Some interfaces have position.""" 

5261 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5262 

5263 self.maxDiff = None 

5264 target_vdu["interfaces"] = [ 

5265 { 

5266 "name": "vdu-eth1", 

5267 "ns-vld-id": "net1", 

5268 }, 

5269 {"name": "vdu-eth2", "ns-vld-id": "net2", "position": 2}, 

5270 { 

5271 "name": "vdu-eth3", 

5272 "ns-vld-id": "mgmtnet", 

5273 }, 

5274 ] 

5275 mock_prepare_vdu_cloud_init.return_value = {} 

5276 mock_prepare_vdu_affinity_group_list.return_value = [] 

5277 persistent_root_disk = { 

5278 "persistent-root-volume": { 

5279 "image_id": "ubuntu20.04", 

5280 "size": "10", 

5281 "keep": True, 

5282 } 

5283 } 

5284 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

5285 

5286 new_kwargs = deepcopy(kwargs) 

5287 new_kwargs.update( 

5288 { 

5289 "vnfr_id": vnfr_id, 

5290 "nsr_id": nsr_id, 

5291 "tasks_by_target_record_id": {}, 

5292 "logger": "logger", 

5293 } 

5294 ) 

5295 

5296 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5297 db.get_one.return_value = vnfd 

5298 result = Ns._process_vdu_params( 

5299 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5300 ) 

5301 expected_extra_dict_copy = deepcopy(expected_extra_dict) 

5302 mock_sort_vdu_interfaces.assert_not_called() 

5303 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5304 mock_prepare_vdu_cloud_init.assert_called_once() 

5305 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5306 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5307 mock_prepare_vdu_interfaces.assert_called_once_with( 

5308 target_vdu, 

5309 expected_extra_dict_copy, 

5310 ns_preffix, 

5311 vnf_preffix, 

5312 "logger", 

5313 {}, 

5314 [], 

5315 ) 

5316 self.assertDictEqual(result, expected_extra_dict_copy) 

5317 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5318 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5319 mock_find_persistent_volumes.assert_not_called() 

5320 mock_find_persistent_root_volumes.assert_not_called() 

5321 

5322 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5323 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5324 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5325 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5326 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5327 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5328 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5329 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5330 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5331 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5332 def test_process_vdu_params_no_interface_position( 

5333 self, 

5334 mock_prepare_vdu_affinity_group_list, 

5335 mock_add_persistent_ordinary_disks_to_disk_list, 

5336 mock_add_persistent_root_disk_to_disk_list, 

5337 mock_find_persistent_volumes, 

5338 mock_find_persistent_root_volumes, 

5339 mock_prepare_vdu_ssh_keys, 

5340 mock_prepare_vdu_cloud_init, 

5341 mock_prepare_vdu_interfaces, 

5342 mock_locate_vdu_interfaces, 

5343 mock_sort_vdu_interfaces, 

5344 ): 

5345 """Interfaces do not have position.""" 

5346 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5347 

5348 self.maxDiff = None 

5349 target_vdu["interfaces"] = interfaces_wthout_positions 

5350 mock_prepare_vdu_cloud_init.return_value = {} 

5351 mock_prepare_vdu_affinity_group_list.return_value = [] 

5352 persistent_root_disk = { 

5353 "persistent-root-volume": { 

5354 "image_id": "ubuntu20.04", 

5355 "size": "10", 

5356 "keep": True, 

5357 } 

5358 } 

5359 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

5360 new_kwargs = deepcopy(kwargs) 

5361 new_kwargs.update( 

5362 { 

5363 "vnfr_id": vnfr_id, 

5364 "nsr_id": nsr_id, 

5365 "tasks_by_target_record_id": {}, 

5366 "logger": "logger", 

5367 } 

5368 ) 

5369 

5370 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5371 db.get_one.return_value = vnfd 

5372 result = Ns._process_vdu_params( 

5373 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5374 ) 

5375 expected_extra_dict_copy = deepcopy(expected_extra_dict) 

5376 mock_sort_vdu_interfaces.assert_not_called() 

5377 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5378 mock_prepare_vdu_cloud_init.assert_called_once() 

5379 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5380 mock_add_persistent_ordinary_disks_to_disk_list.assert_called_once() 

5381 mock_prepare_vdu_interfaces.assert_called_once_with( 

5382 target_vdu, 

5383 expected_extra_dict_copy, 

5384 ns_preffix, 

5385 vnf_preffix, 

5386 "logger", 

5387 {}, 

5388 [], 

5389 ) 

5390 self.assertDictEqual(result, expected_extra_dict_copy) 

5391 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5392 mock_prepare_vdu_affinity_group_list.assert_called_once() 

5393 mock_find_persistent_volumes.assert_not_called() 

5394 mock_find_persistent_root_volumes.assert_not_called() 

5395 

5396 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5397 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5398 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5399 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5400 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5401 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5402 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5403 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5404 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5405 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5406 def test_process_vdu_params_prepare_vdu_interfaces_raises_exception( 

5407 self, 

5408 mock_prepare_vdu_affinity_group_list, 

5409 mock_add_persistent_ordinary_disks_to_disk_list, 

5410 mock_add_persistent_root_disk_to_disk_list, 

5411 mock_find_persistent_volumes, 

5412 mock_find_persistent_root_volumes, 

5413 mock_prepare_vdu_ssh_keys, 

5414 mock_prepare_vdu_cloud_init, 

5415 mock_prepare_vdu_interfaces, 

5416 mock_locate_vdu_interfaces, 

5417 mock_sort_vdu_interfaces, 

5418 ): 

5419 """Prepare vdu interfaces method raises exception.""" 

5420 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5421 

5422 self.maxDiff = None 

5423 target_vdu["interfaces"] = interfaces_wthout_positions 

5424 mock_prepare_vdu_cloud_init.return_value = {} 

5425 mock_prepare_vdu_affinity_group_list.return_value = [] 

5426 persistent_root_disk = { 

5427 "persistent-root-volume": { 

5428 "image_id": "ubuntu20.04", 

5429 "size": "10", 

5430 "keep": True, 

5431 } 

5432 } 

5433 mock_find_persistent_root_volumes.return_value = persistent_root_disk 

5434 new_kwargs = deepcopy(kwargs) 

5435 new_kwargs.update( 

5436 { 

5437 "vnfr_id": vnfr_id, 

5438 "nsr_id": nsr_id, 

5439 "tasks_by_target_record_id": {}, 

5440 "logger": "logger", 

5441 } 

5442 ) 

5443 mock_prepare_vdu_interfaces.side_effect = TypeError 

5444 

5445 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5446 db.get_one.return_value = vnfd 

5447 with self.assertRaises(Exception) as err: 

5448 Ns._process_vdu_params( 

5449 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5450 ) 

5451 self.assertEqual(type(err), TypeError) 

5452 mock_sort_vdu_interfaces.assert_not_called() 

5453 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5454 mock_prepare_vdu_cloud_init.assert_not_called() 

5455 mock_add_persistent_root_disk_to_disk_list.assert_not_called() 

5456 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called() 

5457 mock_prepare_vdu_interfaces.assert_called_once() 

5458 mock_prepare_vdu_ssh_keys.assert_not_called() 

5459 mock_prepare_vdu_affinity_group_list.assert_not_called() 

5460 mock_find_persistent_volumes.assert_not_called() 

5461 mock_find_persistent_root_volumes.assert_not_called() 

5462 

5463 @patch("osm_ng_ro.ns.Ns._sort_vdu_interfaces") 

5464 @patch("osm_ng_ro.ns.Ns._partially_locate_vdu_interfaces") 

5465 @patch("osm_ng_ro.ns.Ns._prepare_vdu_interfaces") 

5466 @patch("osm_ng_ro.ns.Ns._prepare_vdu_cloud_init") 

5467 @patch("osm_ng_ro.ns.Ns._prepare_vdu_ssh_keys") 

5468 @patch("osm_ng_ro.ns.Ns.find_persistent_root_volumes") 

5469 @patch("osm_ng_ro.ns.Ns.find_persistent_volumes") 

5470 @patch("osm_ng_ro.ns.Ns._add_persistent_root_disk_to_disk_list") 

5471 @patch("osm_ng_ro.ns.Ns._add_persistent_ordinary_disks_to_disk_list") 

5472 @patch("osm_ng_ro.ns.Ns._prepare_vdu_affinity_group_list") 

5473 def test_process_vdu_params_add_persistent_root_disk_raises_exception( 

5474 self, 

5475 mock_prepare_vdu_affinity_group_list, 

5476 mock_add_persistent_ordinary_disks_to_disk_list, 

5477 mock_add_persistent_root_disk_to_disk_list, 

5478 mock_find_persistent_volumes, 

5479 mock_find_persistent_root_volumes, 

5480 mock_prepare_vdu_ssh_keys, 

5481 mock_prepare_vdu_cloud_init, 

5482 mock_prepare_vdu_interfaces, 

5483 mock_locate_vdu_interfaces, 

5484 mock_sort_vdu_interfaces, 

5485 ): 

5486 """Add persistent root disk method raises exception.""" 

5487 target_vdu = deepcopy(target_vdu_wth_persistent_storage) 

5488 

5489 self.maxDiff = None 

5490 target_vdu["interfaces"] = interfaces_wthout_positions 

5491 mock_prepare_vdu_cloud_init.return_value = {} 

5492 mock_prepare_vdu_affinity_group_list.return_value = [] 

5493 mock_add_persistent_root_disk_to_disk_list.side_effect = KeyError 

5494 new_kwargs = deepcopy(kwargs) 

5495 new_kwargs.update( 

5496 { 

5497 "vnfr_id": vnfr_id, 

5498 "nsr_id": nsr_id, 

5499 "tasks_by_target_record_id": {}, 

5500 "logger": "logger", 

5501 } 

5502 ) 

5503 

5504 vnfd = deepcopy(vnfd_wth_persistent_storage) 

5505 db.get_one.return_value = vnfd 

5506 with self.assertRaises(Exception) as err: 

5507 Ns._process_vdu_params( 

5508 target_vdu, indata, vim_info=None, target_record_id=None, **new_kwargs 

5509 ) 

5510 self.assertEqual(type(err), KeyError) 

5511 mock_sort_vdu_interfaces.assert_not_called() 

5512 mock_locate_vdu_interfaces.assert_called_once_with(target_vdu) 

5513 mock_prepare_vdu_cloud_init.assert_called_once() 

5514 mock_add_persistent_root_disk_to_disk_list.assert_called_once() 

5515 mock_add_persistent_ordinary_disks_to_disk_list.assert_not_called() 

5516 mock_prepare_vdu_interfaces.assert_called_once_with( 

5517 target_vdu, 

5518 { 

5519 "depends_on": [ 

5520 f"{ns_preffix}:image.0", 

5521 f"{ns_preffix}:flavor.0", 

5522 ] 

5523 }, 

5524 ns_preffix, 

5525 vnf_preffix, 

5526 "logger", 

5527 {}, 

5528 [], 

5529 ) 

5530 

5531 mock_prepare_vdu_ssh_keys.assert_called_once_with(target_vdu, None, {}) 

5532 mock_prepare_vdu_affinity_group_list.assert_not_called() 

5533 mock_find_persistent_volumes.assert_not_called() 

5534 mock_find_persistent_root_volumes.assert_not_called() 

5535 

5536 def test_select_persistent_root_disk(self): 

5537 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5538 vdu["virtual-storage-desc"] = [ 

5539 "persistent-root-volume", 

5540 "persistent-volume2", 

5541 "ephemeral-volume", 

5542 ] 

5543 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5544 expected_result = vsd 

5545 result = Ns._select_persistent_root_disk(vsd, vdu) 

5546 self.assertEqual(result, expected_result) 

5547 

5548 def test_select_persistent_root_disk_first_vsd_is_different(self): 

5549 """VDU first virtual-storage-desc is different than vsd id.""" 

5550 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5551 vdu["virtual-storage-desc"] = [ 

5552 "persistent-volume2", 

5553 "persistent-root-volume", 

5554 "ephemeral-volume", 

5555 ] 

5556 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5557 expected_result = None 

5558 result = Ns._select_persistent_root_disk(vsd, vdu) 

5559 self.assertEqual(result, expected_result) 

5560 

5561 def test_select_persistent_root_disk_vsd_is_not_persistent(self): 

5562 """vsd type is not persistent.""" 

5563 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5564 vdu["virtual-storage-desc"] = [ 

5565 "persistent-volume2", 

5566 "persistent-root-volume", 

5567 "ephemeral-volume", 

5568 ] 

5569 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5570 vsd["type-of-storage"] = "etsi-nfv-descriptors:ephemeral-storage" 

5571 expected_result = None 

5572 result = Ns._select_persistent_root_disk(vsd, vdu) 

5573 self.assertEqual(result, expected_result) 

5574 

5575 def test_select_persistent_root_disk_vsd_does_not_have_size(self): 

5576 """vsd size is None.""" 

5577 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5578 vdu["virtual-storage-desc"] = [ 

5579 "persistent-volume2", 

5580 "persistent-root-volume", 

5581 "ephemeral-volume", 

5582 ] 

5583 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5584 vsd["size-of-storage"] = None 

5585 expected_result = None 

5586 result = Ns._select_persistent_root_disk(vsd, vdu) 

5587 self.assertEqual(result, expected_result) 

5588 

5589 def test_select_persistent_root_disk_vdu_wthout_vsd(self): 

5590 """VDU does not have virtual-storage-desc.""" 

5591 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5592 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"][1] 

5593 expected_result = None 

5594 result = Ns._select_persistent_root_disk(vsd, vdu) 

5595 self.assertEqual(result, expected_result) 

5596 

5597 def test_select_persistent_root_disk_invalid_vsd_type(self): 

5598 """vsd is list, expected to be a dict.""" 

5599 vdu = deepcopy(target_vdu_wth_persistent_storage) 

5600 vsd = deepcopy(vnfd_wth_persistent_storage)["virtual-storage-desc"] 

5601 with self.assertRaises(AttributeError): 

5602 Ns._select_persistent_root_disk(vsd, vdu) 

5603 

5604 

5605class TestSFC(unittest.TestCase): 

5606 def setUp(self): 

5607 self.ns = Ns() 

5608 self.logger = CopyingMock(autospec=True) 

5609 

5610 @patch("osm_ng_ro.ns.Ns._prefix_ip_address") 

5611 @patch("osm_ng_ro.ns.Ns._process_ip_proto") 

5612 @patch("osm_ng_ro.ns.Ns._get_vnfr_vdur_text") 

5613 def test_process_classification_params( 

5614 self, mock_get_vnfr_vdur_text, mock_process_ip_proto, mock_prefix_ip_address 

5615 ): 

5616 db = Mock() 

5617 mock_prefix_ip_address.side_effect = ["10.10.10.10/32", "20.20.20.20/32"] 

5618 mock_process_ip_proto.return_value = "tcp" 

5619 mock_get_vnfr_vdur_text.return_value = "vdur_text" 

5620 vim_info, indata, target_record_id = {}, {}, "" 

5621 target_classification = { 

5622 "vnfr_id": "1234", 

5623 "source-ip-address": "10.10.10.10", 

5624 "destination-ip-address": "20.20.20.20", 

5625 "ip-proto": "6", 

5626 "id": "rule1", 

5627 "source-port": "0", 

5628 "destination-port": 5555, 

5629 "vdur_id": "5678", 

5630 "ingress_port_index": 0, 

5631 "vim_info": vim_info, 

5632 } 

5633 kwargs = {"db": db} 

5634 

5635 expected_result = { 

5636 "depends_on": ["vdur_text"], 

5637 "params": { 

5638 "destination_ip_prefix": "20.20.20.20/32", 

5639 "destination_port_range_max": 5555, 

5640 "destination_port_range_min": 5555, 

5641 "logical_source_port": "TASK-vdur_text", 

5642 "logical_source_port_index": 0, 

5643 "name": "rule1", 

5644 "protocol": "tcp", 

5645 "source_ip_prefix": "10.10.10.10/32", 

5646 "source_port_range_max": "0", 

5647 "source_port_range_min": "0", 

5648 }, 

5649 } 

5650 

5651 result = self.ns._process_classification_params( 

5652 target_classification, indata, vim_info, target_record_id, **kwargs 

5653 ) 

5654 self.assertEqual(expected_result, result) 

5655 

5656 def test_process_sfp_params(self): 

5657 sf_text = "nsrs:1234:sf.sf1" 

5658 classi_text = "nsrs:1234:classification.rule1" 

5659 vim_info, indata, target_record_id = {}, {}, "" 

5660 target_sfp = { 

5661 "id": "sfp1", 

5662 "sfs": ["sf1"], 

5663 "classifications": ["rule1"], 

5664 "vim_info": vim_info, 

5665 } 

5666 

5667 kwargs = {"nsr_id": "1234"} 

5668 

5669 expected_result = { 

5670 "depends_on": [sf_text, classi_text], 

5671 "params": { 

5672 "name": "sfp1", 

5673 "sfs": ["TASK-" + sf_text], 

5674 "classifications": ["TASK-" + classi_text], 

5675 }, 

5676 } 

5677 

5678 result = self.ns._process_sfp_params( 

5679 target_sfp, indata, vim_info, target_record_id, **kwargs 

5680 ) 

5681 self.assertEqual(expected_result, result) 

5682 

5683 def test_process_sf_params(self): 

5684 sfi_text = "nsrs::sfi.sfi1" 

5685 vim_info, indata, target_record_id = {}, {}, "" 

5686 target_sf = {"id": "sf1", "sfis": ["sfi1"], "vim_info": vim_info} 

5687 

5688 kwargs = {"ns_id": "1234"} 

5689 

5690 expected_result = { 

5691 "depends_on": [sfi_text], 

5692 "params": { 

5693 "name": "sf1", 

5694 "sfis": ["TASK-" + sfi_text], 

5695 }, 

5696 } 

5697 

5698 result = self.ns._process_sf_params( 

5699 target_sf, indata, vim_info, target_record_id, **kwargs 

5700 ) 

5701 self.assertEqual(expected_result, result) 

5702 

5703 @patch("osm_ng_ro.ns.Ns._get_vnfr_vdur_text") 

5704 def test_process_sfi_params(self, mock_get_vnfr_vdur_text): 

5705 db = Mock() 

5706 mock_get_vnfr_vdur_text.return_value = "vdur_text" 

5707 vim_info, indata, target_record_id = {}, {}, "" 

5708 target_sfi = { 

5709 "id": "sfi1", 

5710 "ingress_port": "vnf-cp0-ext", 

5711 "egress_port": "vnf-cp0-ext", 

5712 "vnfr_id": "1234", 

5713 "vdur_id": "5678", 

5714 "ingress_port_index": 0, 

5715 "egress_port_index": 0, 

5716 "vim_info": {}, 

5717 } 

5718 kwargs = {"db": db} 

5719 

5720 expected_result = { 

5721 "depends_on": ["vdur_text"], 

5722 "params": { 

5723 "name": "sfi1", 

5724 "ingress_port": "TASK-vdur_text", 

5725 "egress_port": "TASK-vdur_text", 

5726 "ingress_port_index": 0, 

5727 "egress_port_index": 0, 

5728 }, 

5729 } 

5730 

5731 result = self.ns._process_sfi_params( 

5732 target_sfi, indata, vim_info, target_record_id, **kwargs 

5733 ) 

5734 self.assertEqual(expected_result, result) 

5735 

5736 def test_process_vnfgd_sfp(self): 

5737 sfp = { 

5738 "id": "sfp1", 

5739 "position-desc-id": [ 

5740 { 

5741 "id": "position1", 

5742 "cp-profile-id": [{"id": "sf1"}], 

5743 "match-attributes": [{"id": "rule1"}], 

5744 } 

5745 ], 

5746 } 

5747 expected_result = {"id": "sfp1", "sfs": ["sf1"], "classifications": ["rule1"]} 

5748 

5749 result = self.ns._process_vnfgd_sfp(sfp) 

5750 self.assertEqual(expected_result, result) 

5751 

5752 def test_process_vnfgd_sf(self): 

5753 sf = {"id": "sf1", "constituent-profile-elements": [{"id": "sfi1", "order": 0}]} 

5754 expected_result = {"id": "sf1", "sfis": ["sfi1"]} 

5755 

5756 result = self.ns._process_vnfgd_sf(sf) 

5757 self.assertEqual(expected_result, result) 

5758 

5759 def test_process_vnfgd_sfi(self): 

5760 sfi = { 

5761 "id": "sfi1", 

5762 "constituent-base-element-id": "vnf", 

5763 "order": 0, 

5764 "ingress-constituent-cpd-id": "vnf-cp0-ext", 

5765 "egress-constituent-cpd-id": "vnf-cp0-ext", 

5766 } 

5767 db_vnfrs = { 

5768 "1234": { 

5769 "id": "1234", 

5770 "member-vnf-index-ref": "vnf", 

5771 "connection-point": [ 

5772 { 

5773 "name": "vnf-cp0-ext", 

5774 "connection-point-id": "vdu-eth0-int", 

5775 "connection-point-vdu-id": "5678", 

5776 "id": "vnf-cp0-ext", 

5777 } 

5778 ], 

5779 } 

5780 } 

5781 expected_result = { 

5782 "id": "sfi1", 

5783 "ingress_port": "vnf-cp0-ext", 

5784 "egress_port": "vnf-cp0-ext", 

5785 "vnfr_id": "1234", 

5786 "vdur_id": "5678", 

5787 "ingress_port_index": 0, 

5788 "egress_port_index": 0, 

5789 } 

5790 

5791 result = self.ns._process_vnfgd_sfi(sfi, db_vnfrs) 

5792 self.assertEqual(expected_result, result) 

5793 

5794 def test_process_vnfgd_classification(self): 

5795 classification = { 

5796 "id": "rule1", 

5797 "ip-proto": 6, 

5798 "source-ip-address": "10.10.10.10", 

5799 "destination-ip-address": "20.20.20.20", 

5800 "constituent-base-element-id": "vnf", 

5801 "constituent-cpd-id": "vnf-cp0-ext", 

5802 "destination-port": 5555, 

5803 } 

5804 db_vnfrs = { 

5805 "1234": { 

5806 "id": "1234", 

5807 "member-vnf-index-ref": "vnf", 

5808 "connection-point": [ 

5809 { 

5810 "name": "vnf-cp0-ext", 

5811 "connection-point-id": "vdu-eth0-int", 

5812 "connection-point-vdu-id": "5678", 

5813 "id": "vnf-cp0-ext", 

5814 } 

5815 ], 

5816 } 

5817 } 

5818 

5819 expected_result = { 

5820 "id": "rule1", 

5821 "ip-proto": 6, 

5822 "source-ip-address": "10.10.10.10", 

5823 "destination-ip-address": "20.20.20.20", 

5824 "destination-port": 5555, 

5825 "vnfr_id": "1234", 

5826 "vdur_id": "5678", 

5827 "ingress_port_index": 0, 

5828 "constituent-base-element-id": "vnf", 

5829 "constituent-cpd-id": "vnf-cp0-ext", 

5830 } 

5831 

5832 result = self.ns._process_vnfgd_classification(classification, db_vnfrs) 

5833 self.assertEqual(expected_result, result)