Coverage for osm_nbi/tests/test_admin_topics.py: 99%

754 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-10 20:04 +0000

1#! /usr/bin/python3 

2# -*- coding: utf-8 -*- 

3 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com" 

18__date__ = "$2019-10-019" 

19 

20import unittest 

21import random 

22from unittest import TestCase 

23from unittest.mock import Mock, patch, call 

24from uuid import uuid4 

25from http import HTTPStatus 

26from time import time 

27from osm_common import dbbase, fsbase, msgbase 

28from osm_common.dbmemory import DbMemory 

29from osm_nbi import authconn, validation 

30from osm_nbi.admin_topics import ( 

31 ProjectTopicAuth, 

32 RoleTopicAuth, 

33 UserTopicAuth, 

34 CommonVimWimSdn, 

35 VcaTopic, 

36) 

37from osm_nbi.engine import EngineException 

38from osm_nbi.authconn import AuthconnNotFoundException 

39from osm_nbi.authconn_internal import AuthconnInternal 

40 

41 

42test_pid = str(uuid4()) 

43test_name = "test-user" 

44 

45 

46def norm(str): 

47 """Normalize string for checking""" 

48 return " ".join(str.strip().split()).lower() 

49 

50 

51class TestVcaTopic(TestCase): 

52 def setUp(self): 

53 self.db = Mock(dbbase.DbBase()) 

54 self.fs = Mock(fsbase.FsBase()) 

55 self.msg = Mock(msgbase.MsgBase()) 

56 self.auth = Mock(authconn.Authconn(None, None, None)) 

57 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth) 

58 

59 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new") 

60 def test_format_on_new(self, mock_super_format_on_new): 

61 content = { 

62 "_id": "id", 

63 "secret": "encrypted_secret", 

64 "cacert": "encrypted_cacert", 

65 } 

66 self.db.encrypt.side_effect = ["secret", "cacert"] 

67 mock_super_format_on_new.return_value = "1234" 

68 

69 oid = self.vca_topic.format_on_new(content) 

70 

71 self.assertEqual(oid, "1234") 

72 self.assertEqual(content["secret"], "secret") 

73 self.assertEqual(content["cacert"], "cacert") 

74 self.db.encrypt.assert_has_calls( 

75 [ 

76 call("encrypted_secret", schema_version="1.11", salt="id"), 

77 call("encrypted_cacert", schema_version="1.11", salt="id"), 

78 ] 

79 ) 

80 mock_super_format_on_new.assert_called_with(content, None, False) 

81 

82 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit") 

83 def test_format_on_edit(self, mock_super_format_on_edit): 

84 edit_content = { 

85 "_id": "id", 

86 "secret": "encrypted_secret", 

87 "cacert": "encrypted_cacert", 

88 } 

89 final_content = { 

90 "_id": "id", 

91 "schema_version": "1.11", 

92 } 

93 self.db.encrypt.side_effect = ["secret", "cacert"] 

94 mock_super_format_on_edit.return_value = "1234" 

95 

96 oid = self.vca_topic.format_on_edit(final_content, edit_content) 

97 

98 self.assertEqual(oid, "1234") 

99 self.assertEqual(final_content["secret"], "secret") 

100 self.assertEqual(final_content["cacert"], "cacert") 

101 self.db.encrypt.assert_has_calls( 

102 [ 

103 call("encrypted_secret", schema_version="1.11", salt="id"), 

104 call("encrypted_cacert", schema_version="1.11", salt="id"), 

105 ] 

106 ) 

107 mock_super_format_on_edit.assert_called() 

108 

109 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del") 

110 def test_check_conflict_on_del(self, mock_check_conflict_on_del): 

111 session = { 

112 "project_id": "project-id", 

113 "force": False, 

114 } 

115 _id = "vca-id" 

116 db_content = {} 

117 

118 self.db.get_list.return_value = None 

119 

120 self.vca_topic.check_conflict_on_del(session, _id, db_content) 

121 

122 self.db.get_list.assert_called_with( 

123 "vim_accounts", 

124 {"vca": _id, "_admin.projects_read.cont": "project-id"}, 

125 ) 

126 mock_check_conflict_on_del.assert_called_with(session, _id, db_content) 

127 

128 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del") 

129 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del): 

130 session = { 

131 "project_id": "project-id", 

132 "force": True, 

133 } 

134 _id = "vca-id" 

135 db_content = {} 

136 

137 self.vca_topic.check_conflict_on_del(session, _id, db_content) 

138 

139 self.db.get_list.assert_not_called() 

140 mock_check_conflict_on_del.assert_not_called() 

141 

142 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del") 

143 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del): 

144 session = { 

145 "project_id": "project-id", 

146 "force": False, 

147 } 

148 _id = "vca-id" 

149 db_content = {} 

150 

151 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"} 

152 

153 with self.assertRaises(EngineException) as context: 

154 self.vca_topic.check_conflict_on_del(session, _id, db_content) 

155 self.assertEqual( 

156 context.exception, 

157 EngineException( 

158 "There is at least one VIM account using this vca", 

159 http_code=HTTPStatus.CONFLICT, 

160 ), 

161 ) 

162 

163 self.db.get_list.assert_called_with( 

164 "vim_accounts", 

165 {"vca": _id, "_admin.projects_read.cont": "project-id"}, 

166 ) 

167 mock_check_conflict_on_del.assert_not_called() 

168 

169 

170class Test_ProjectTopicAuth(TestCase): 

171 @classmethod 

172 def setUpClass(cls): 

173 cls.test_name = "test-project-topic" 

174 

175 def setUp(self): 

176 self.db = Mock(dbbase.DbBase()) 

177 self.fs = Mock(fsbase.FsBase()) 

178 self.msg = Mock(msgbase.MsgBase()) 

179 self.auth = Mock(authconn.Authconn(None, None, None)) 

180 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth) 

181 self.fake_session = { 

182 "username": self.test_name, 

183 "project_id": (test_pid,), 

184 "method": None, 

185 "admin": True, 

186 "force": False, 

187 "public": False, 

188 "allow_show_user_project_role": True, 

189 } 

190 self.topic.check_quota = Mock(return_value=None) # skip quota 

191 

192 def test_new_project(self): 

193 with self.subTest(i=1): 

194 rollback = [] 

195 pid1 = str(uuid4()) 

196 self.auth.get_project_list.return_value = [] 

197 self.auth.create_project.return_value = pid1 

198 pid2, oid = self.topic.new( 

199 rollback, self.fake_session, {"name": self.test_name, "quotas": {}} 

200 ) 

201 self.assertEqual(len(rollback), 1, "Wrong rollback length") 

202 self.assertEqual(pid2, pid1, "Wrong project identifier") 

203 content = self.auth.create_project.call_args[0][0] 

204 self.assertEqual(content["name"], self.test_name, "Wrong project name") 

205 self.assertEqual(content["quotas"], {}, "Wrong quotas") 

206 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") 

207 self.assertEqual( 

208 content["_admin"]["modified"], 

209 content["_admin"]["created"], 

210 "Wrong modification time", 

211 ) 

212 with self.subTest(i=2): 

213 rollback = [] 

214 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: 

215 self.topic.new( 

216 rollback, 

217 self.fake_session, 

218 {"name": "other-project-name", "quotas": {"baditems": 10}}, 

219 ) 

220 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

221 self.assertEqual( 

222 e.exception.http_code, 

223 HTTPStatus.UNPROCESSABLE_ENTITY, 

224 "Wrong HTTP status code", 

225 ) 

226 self.assertIn( 

227 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format( 

228 "baditems" 

229 ), 

230 norm(str(e.exception)), 

231 "Wrong exception text", 

232 ) 

233 

234 def test_edit_project(self): 

235 now = time() 

236 pid = str(uuid4()) 

237 proj = { 

238 "_id": pid, 

239 "name": self.test_name, 

240 "_admin": {"created": now, "modified": now}, 

241 } 

242 with self.subTest(i=1): 

243 self.auth.get_project_list.side_effect = [[proj], []] 

244 new_name = "new-project-name" 

245 quotas = { 

246 "vnfds": random.SystemRandom().randint(0, 100), 

247 "nsds": random.SystemRandom().randint(0, 100), 

248 } 

249 self.topic.edit( 

250 self.fake_session, pid, {"name": new_name, "quotas": quotas} 

251 ) 

252 _id, content = self.auth.update_project.call_args[0] 

253 self.assertEqual(_id, pid, "Wrong project identifier") 

254 self.assertEqual(content["_id"], pid, "Wrong project identifier") 

255 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") 

256 self.assertGreater( 

257 content["_admin"]["modified"], now, "Wrong modification time" 

258 ) 

259 self.assertEqual(content["name"], new_name, "Wrong project name") 

260 self.assertEqual(content["quotas"], quotas, "Wrong quotas") 

261 with self.subTest(i=2): 

262 new_name = "other-project-name" 

263 quotas = {"baditems": random.SystemRandom().randint(0, 100)} 

264 self.auth.get_project_list.side_effect = [[proj], []] 

265 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: 

266 self.topic.edit( 

267 self.fake_session, pid, {"name": new_name, "quotas": quotas} 

268 ) 

269 self.assertEqual( 

270 e.exception.http_code, 

271 HTTPStatus.UNPROCESSABLE_ENTITY, 

272 "Wrong HTTP status code", 

273 ) 

274 self.assertIn( 

275 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format( 

276 "baditems" 

277 ), 

278 norm(str(e.exception)), 

279 "Wrong exception text", 

280 ) 

281 

282 def test_conflict_on_new(self): 

283 with self.subTest(i=1): 

284 rollback = [] 

285 pid = str(uuid4()) 

286 with self.assertRaises( 

287 EngineException, msg="Accepted uuid as project name" 

288 ) as e: 

289 self.topic.new(rollback, self.fake_session, {"name": pid}) 

290 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

291 self.assertEqual( 

292 e.exception.http_code, 

293 HTTPStatus.UNPROCESSABLE_ENTITY, 

294 "Wrong HTTP status code", 

295 ) 

296 self.assertIn( 

297 "project name '{}' cannot have an uuid format".format(pid), 

298 norm(str(e.exception)), 

299 "Wrong exception text", 

300 ) 

301 with self.subTest(i=2): 

302 rollback = [] 

303 self.auth.get_project_list.return_value = [ 

304 {"_id": test_pid, "name": self.test_name} 

305 ] 

306 with self.assertRaises( 

307 EngineException, msg="Accepted existing project name" 

308 ) as e: 

309 self.topic.new(rollback, self.fake_session, {"name": self.test_name}) 

310 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

311 self.assertEqual( 

312 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

313 ) 

314 self.assertIn( 

315 "project '{}' exists".format(self.test_name), 

316 norm(str(e.exception)), 

317 "Wrong exception text", 

318 ) 

319 

320 def test_conflict_on_edit(self): 

321 with self.subTest(i=1): 

322 self.auth.get_project_list.return_value = [ 

323 {"_id": test_pid, "name": self.test_name} 

324 ] 

325 new_name = str(uuid4()) 

326 with self.assertRaises( 

327 EngineException, msg="Accepted uuid as project name" 

328 ) as e: 

329 self.topic.edit(self.fake_session, test_pid, {"name": new_name}) 

330 self.assertEqual( 

331 e.exception.http_code, 

332 HTTPStatus.UNPROCESSABLE_ENTITY, 

333 "Wrong HTTP status code", 

334 ) 

335 self.assertIn( 

336 "project name '{}' cannot have an uuid format".format(new_name), 

337 norm(str(e.exception)), 

338 "Wrong exception text", 

339 ) 

340 with self.subTest(i=2): 

341 pid = str(uuid4()) 

342 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}] 

343 with self.assertRaises( 

344 EngineException, msg="Accepted renaming of project 'admin'" 

345 ) as e: 

346 self.topic.edit(self.fake_session, pid, {"name": "new-name"}) 

347 self.assertEqual( 

348 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

349 ) 

350 self.assertIn( 

351 "you cannot rename project 'admin'", 

352 norm(str(e.exception)), 

353 "Wrong exception text", 

354 ) 

355 with self.subTest(i=3): 

356 new_name = "new-project-name" 

357 self.auth.get_project_list.side_effect = [ 

358 [{"_id": test_pid, "name": self.test_name}], 

359 [{"_id": str(uuid4()), "name": new_name}], 

360 ] 

361 with self.assertRaises( 

362 EngineException, msg="Accepted existing project name" 

363 ) as e: 

364 self.topic.edit(self.fake_session, pid, {"name": new_name}) 

365 self.assertEqual( 

366 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

367 ) 

368 self.assertIn( 

369 "project '{}' is already used".format(new_name), 

370 norm(str(e.exception)), 

371 "Wrong exception text", 

372 ) 

373 

374 def test_delete_project(self): 

375 with self.subTest(i=1): 

376 pid = str(uuid4()) 

377 self.auth.get_project.return_value = { 

378 "_id": pid, 

379 "name": "other-project-name", 

380 } 

381 self.auth.delete_project.return_value = {"deleted": 1} 

382 self.auth.get_user_list.return_value = [] 

383 self.db.get_list.return_value = [] 

384 rc = self.topic.delete(self.fake_session, pid) 

385 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info") 

386 self.assertEqual( 

387 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier" 

388 ) 

389 self.assertEqual( 

390 self.auth.delete_project.call_args[0][0], 

391 pid, 

392 "Wrong project identifier", 

393 ) 

394 

395 def test_conflict_on_del(self): 

396 with self.subTest(i=1): 

397 self.auth.get_project.return_value = { 

398 "_id": test_pid, 

399 "name": self.test_name, 

400 } 

401 with self.assertRaises( 

402 EngineException, msg="Accepted deletion of own project" 

403 ) as e: 

404 self.topic.delete(self.fake_session, self.test_name) 

405 self.assertEqual( 

406 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

407 ) 

408 self.assertIn( 

409 "you cannot delete your own project", 

410 norm(str(e.exception)), 

411 "Wrong exception text", 

412 ) 

413 with self.subTest(i=2): 

414 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"} 

415 with self.assertRaises( 

416 EngineException, msg="Accepted deletion of project 'admin'" 

417 ) as e: 

418 self.topic.delete(self.fake_session, "admin") 

419 self.assertEqual( 

420 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

421 ) 

422 self.assertIn( 

423 "you cannot delete project 'admin'", 

424 norm(str(e.exception)), 

425 "Wrong exception text", 

426 ) 

427 with self.subTest(i=3): 

428 pid = str(uuid4()) 

429 name = "other-project-name" 

430 self.auth.get_project.return_value = {"_id": pid, "name": name} 

431 self.auth.get_user_list.return_value = [ 

432 { 

433 "_id": str(uuid4()), 

434 "username": self.test_name, 

435 "project_role_mappings": [{"project": pid, "role": str(uuid4())}], 

436 } 

437 ] 

438 with self.assertRaises( 

439 EngineException, msg="Accepted deletion of used project" 

440 ) as e: 

441 self.topic.delete(self.fake_session, pid) 

442 self.assertEqual( 

443 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

444 ) 

445 self.assertIn( 

446 "project '{}' ({}) is being used by user '{}'".format( 

447 name, pid, self.test_name 

448 ), 

449 norm(str(e.exception)), 

450 "Wrong exception text", 

451 ) 

452 with self.subTest(i=4): 

453 self.auth.get_user_list.return_value = [] 

454 self.db.get_list.return_value = [ 

455 { 

456 "_id": str(uuid4()), 

457 "id": self.test_name, 

458 "_admin": {"projects_read": [pid], "projects_write": []}, 

459 } 

460 ] 

461 with self.assertRaises( 

462 EngineException, msg="Accepted deletion of used project" 

463 ) as e: 

464 self.topic.delete(self.fake_session, pid) 

465 self.assertEqual( 

466 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

467 ) 

468 self.assertIn( 

469 "project '{}' ({}) is being used by {} '{}'".format( 

470 name, pid, "vnf descriptor", self.test_name 

471 ), 

472 norm(str(e.exception)), 

473 "Wrong exception text", 

474 ) 

475 

476 

477class Test_RoleTopicAuth(TestCase): 

478 @classmethod 

479 def setUpClass(cls): 

480 cls.test_name = "test-role-topic" 

481 cls.test_operations = ["tokens:get"] 

482 

483 def setUp(self): 

484 self.db = Mock(dbbase.DbBase()) 

485 self.fs = Mock(fsbase.FsBase()) 

486 self.msg = Mock(msgbase.MsgBase()) 

487 self.auth = Mock(authconn.Authconn(None, None, None)) 

488 self.auth.role_permissions = self.test_operations 

489 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth) 

490 self.fake_session = { 

491 "username": test_name, 

492 "project_id": (test_pid,), 

493 "method": None, 

494 "admin": True, 

495 "force": False, 

496 "public": False, 

497 "allow_show_user_project_role": True, 

498 } 

499 self.topic.check_quota = Mock(return_value=None) # skip quota 

500 

501 def test_new_role(self): 

502 with self.subTest(i=1): 

503 rollback = [] 

504 rid1 = str(uuid4()) 

505 perms_in = {"tokens": True} 

506 perms_out = {"default": False, "admin": False, "tokens": True} 

507 self.auth.get_role_list.return_value = [] 

508 self.auth.create_role.return_value = rid1 

509 rid2, oid = self.topic.new( 

510 rollback, 

511 self.fake_session, 

512 {"name": self.test_name, "permissions": perms_in}, 

513 ) 

514 self.assertEqual(len(rollback), 1, "Wrong rollback length") 

515 self.assertEqual(rid2, rid1, "Wrong project identifier") 

516 content = self.auth.create_role.call_args[0][0] 

517 self.assertEqual(content["name"], self.test_name, "Wrong role name") 

518 self.assertEqual(content["permissions"], perms_out, "Wrong permissions") 

519 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") 

520 self.assertEqual( 

521 content["_admin"]["modified"], 

522 content["_admin"]["created"], 

523 "Wrong modification time", 

524 ) 

525 with self.subTest(i=2): 

526 rollback = [] 

527 with self.assertRaises( 

528 EngineException, msg="Accepted wrong permissions" 

529 ) as e: 

530 self.topic.new( 

531 rollback, 

532 self.fake_session, 

533 {"name": "other-role-name", "permissions": {"projects": True}}, 

534 ) 

535 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

536 self.assertEqual( 

537 e.exception.http_code, 

538 HTTPStatus.UNPROCESSABLE_ENTITY, 

539 "Wrong HTTP status code", 

540 ) 

541 self.assertIn( 

542 "invalid permission '{}'".format("projects"), 

543 norm(str(e.exception)), 

544 "Wrong exception text", 

545 ) 

546 

547 def test_edit_role(self): 

548 now = time() 

549 rid = str(uuid4()) 

550 role = { 

551 "_id": rid, 

552 "name": self.test_name, 

553 "permissions": {"tokens": True}, 

554 "_admin": {"created": now, "modified": now}, 

555 } 

556 with self.subTest(i=1): 

557 self.auth.get_role_list.side_effect = [[role], []] 

558 self.auth.get_role.return_value = role 

559 new_name = "new-role-name" 

560 perms_in = {"tokens": False, "tokens:get": True} 

561 perms_out = { 

562 "default": False, 

563 "admin": False, 

564 "tokens": False, 

565 "tokens:get": True, 

566 } 

567 self.topic.edit( 

568 self.fake_session, rid, {"name": new_name, "permissions": perms_in} 

569 ) 

570 content = self.auth.update_role.call_args[0][0] 

571 self.assertEqual(content["_id"], rid, "Wrong role identifier") 

572 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") 

573 self.assertGreater( 

574 content["_admin"]["modified"], now, "Wrong modification time" 

575 ) 

576 self.assertEqual(content["name"], new_name, "Wrong role name") 

577 self.assertEqual(content["permissions"], perms_out, "Wrong permissions") 

578 with self.subTest(i=2): 

579 new_name = "other-role-name" 

580 perms_in = {"tokens": False, "tokens:post": True} 

581 self.auth.get_role_list.side_effect = [[role], []] 

582 with self.assertRaises( 

583 EngineException, msg="Accepted wrong permissions" 

584 ) as e: 

585 self.topic.edit( 

586 self.fake_session, rid, {"name": new_name, "permissions": perms_in} 

587 ) 

588 self.assertEqual( 

589 e.exception.http_code, 

590 HTTPStatus.UNPROCESSABLE_ENTITY, 

591 "Wrong HTTP status code", 

592 ) 

593 self.assertIn( 

594 "invalid permission '{}'".format("tokens:post"), 

595 norm(str(e.exception)), 

596 "Wrong exception text", 

597 ) 

598 

599 def test_delete_role(self): 

600 with self.subTest(i=1): 

601 rid = str(uuid4()) 

602 role = {"_id": rid, "name": "other-role-name"} 

603 self.auth.get_role_list.return_value = [role] 

604 self.auth.get_role.return_value = role 

605 self.auth.delete_role.return_value = {"deleted": 1} 

606 self.auth.get_user_list.return_value = [] 

607 rc = self.topic.delete(self.fake_session, rid) 

608 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info") 

609 self.assertEqual( 

610 self.auth.get_role_list.call_args[0][0]["_id"], 

611 rid, 

612 "Wrong role identifier", 

613 ) 

614 self.assertEqual( 

615 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier" 

616 ) 

617 self.assertEqual( 

618 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier" 

619 ) 

620 

621 def test_conflict_on_new(self): 

622 with self.subTest(i=1): 

623 rollback = [] 

624 rid = str(uuid4()) 

625 with self.assertRaises( 

626 EngineException, msg="Accepted uuid as role name" 

627 ) as e: 

628 self.topic.new(rollback, self.fake_session, {"name": rid}) 

629 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

630 self.assertEqual( 

631 e.exception.http_code, 

632 HTTPStatus.UNPROCESSABLE_ENTITY, 

633 "Wrong HTTP status code", 

634 ) 

635 self.assertIn( 

636 "role name '{}' cannot have an uuid format".format(rid), 

637 norm(str(e.exception)), 

638 "Wrong exception text", 

639 ) 

640 with self.subTest(i=2): 

641 rollback = [] 

642 self.auth.get_role_list.return_value = [ 

643 {"_id": str(uuid4()), "name": self.test_name} 

644 ] 

645 with self.assertRaises( 

646 EngineException, msg="Accepted existing role name" 

647 ) as e: 

648 self.topic.new(rollback, self.fake_session, {"name": self.test_name}) 

649 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

650 self.assertEqual( 

651 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

652 ) 

653 self.assertIn( 

654 "role name '{}' exists".format(self.test_name), 

655 norm(str(e.exception)), 

656 "Wrong exception text", 

657 ) 

658 

659 def test_conflict_on_edit(self): 

660 rid = str(uuid4()) 

661 with self.subTest(i=1): 

662 self.auth.get_role_list.return_value = [ 

663 {"_id": rid, "name": self.test_name, "permissions": {}} 

664 ] 

665 new_name = str(uuid4()) 

666 with self.assertRaises( 

667 EngineException, msg="Accepted uuid as role name" 

668 ) as e: 

669 self.topic.edit(self.fake_session, rid, {"name": new_name}) 

670 self.assertEqual( 

671 e.exception.http_code, 

672 HTTPStatus.UNPROCESSABLE_ENTITY, 

673 "Wrong HTTP status code", 

674 ) 

675 self.assertIn( 

676 "role name '{}' cannot have an uuid format".format(new_name), 

677 norm(str(e.exception)), 

678 "Wrong exception text", 

679 ) 

680 for i, role_name in enumerate(["system_admin", "project_admin"], start=2): 

681 with self.subTest(i=i): 

682 rid = str(uuid4()) 

683 self.auth.get_role.return_value = { 

684 "_id": rid, 

685 "name": role_name, 

686 "permissions": {}, 

687 } 

688 with self.assertRaises( 

689 EngineException, 

690 msg="Accepted renaming of role '{}'".format(role_name), 

691 ) as e: 

692 self.topic.edit(self.fake_session, rid, {"name": "new-name"}) 

693 self.assertEqual( 

694 e.exception.http_code, 

695 HTTPStatus.FORBIDDEN, 

696 "Wrong HTTP status code", 

697 ) 

698 self.assertIn( 

699 "you cannot rename role '{}'".format(role_name), 

700 norm(str(e.exception)), 

701 "Wrong exception text", 

702 ) 

703 with self.subTest(i=i + 1): 

704 new_name = "new-role-name" 

705 self.auth.get_role_list.side_effect = [ 

706 [{"_id": rid, "name": self.test_name, "permissions": {}}], 

707 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}], 

708 ] 

709 self.auth.get_role.return_value = { 

710 "_id": rid, 

711 "name": self.test_name, 

712 "permissions": {}, 

713 } 

714 with self.assertRaises( 

715 EngineException, msg="Accepted existing role name" 

716 ) as e: 

717 self.topic.edit(self.fake_session, rid, {"name": new_name}) 

718 self.assertEqual( 

719 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

720 ) 

721 self.assertIn( 

722 "role name '{}' exists".format(new_name), 

723 norm(str(e.exception)), 

724 "Wrong exception text", 

725 ) 

726 

727 def test_conflict_on_del(self): 

728 for i, role_name in enumerate(["system_admin", "project_admin"], start=1): 

729 with self.subTest(i=i): 

730 rid = str(uuid4()) 

731 role = {"_id": rid, "name": role_name} 

732 self.auth.get_role_list.return_value = [role] 

733 self.auth.get_role.return_value = role 

734 with self.assertRaises( 

735 EngineException, 

736 msg="Accepted deletion of role '{}'".format(role_name), 

737 ) as e: 

738 self.topic.delete(self.fake_session, rid) 

739 self.assertEqual( 

740 e.exception.http_code, 

741 HTTPStatus.FORBIDDEN, 

742 "Wrong HTTP status code", 

743 ) 

744 self.assertIn( 

745 "you cannot delete role '{}'".format(role_name), 

746 norm(str(e.exception)), 

747 "Wrong exception text", 

748 ) 

749 with self.subTest(i=i + 1): 

750 rid = str(uuid4()) 

751 name = "other-role-name" 

752 role = {"_id": rid, "name": name} 

753 self.auth.get_role_list.return_value = [role] 

754 self.auth.get_role.return_value = role 

755 self.auth.get_user_list.return_value = [ 

756 { 

757 "_id": str(uuid4()), 

758 "username": self.test_name, 

759 "project_role_mappings": [{"project": str(uuid4()), "role": rid}], 

760 } 

761 ] 

762 with self.assertRaises( 

763 EngineException, msg="Accepted deletion of used role" 

764 ) as e: 

765 self.topic.delete(self.fake_session, rid) 

766 self.assertEqual( 

767 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

768 ) 

769 self.assertIn( 

770 "role '{}' ({}) is being used by user '{}'".format( 

771 name, rid, self.test_name 

772 ), 

773 norm(str(e.exception)), 

774 "Wrong exception text", 

775 ) 

776 

777 

778class Test_UserTopicAuth(TestCase): 

779 @classmethod 

780 def setUpClass(cls): 

781 cls.test_name = "test-user-topic" 

782 cls.password = "Test@123" 

783 

784 def setUp(self): 

785 # self.db = Mock(dbbase.DbBase()) 

786 self.db = DbMemory() 

787 self.fs = Mock(fsbase.FsBase()) 

788 self.msg = Mock(msgbase.MsgBase()) 

789 self.auth = Mock(authconn.Authconn(None, None, None)) 

790 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth) 

791 self.fake_session = { 

792 "username": test_name, 

793 "project_id": (test_pid,), 

794 "method": None, 

795 "admin": True, 

796 "force": False, 

797 "public": False, 

798 "allow_show_user_project_role": True, 

799 } 

800 self.topic.check_quota = Mock(return_value=None) # skip quota 

801 

802 def test_new_user(self): 

803 uid1 = str(uuid4()) 

804 pid = str(uuid4()) 

805 self.auth.get_user_list.return_value = [] 

806 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"} 

807 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name} 

808 with self.subTest(i=1): 

809 rollback = [] 

810 rid = str(uuid4()) 

811 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"} 

812 prms_in = [{"project": "some_project", "role": "some_role"}] 

813 prms_out = [{"project": pid, "role": rid}] 

814 uid2, oid = self.topic.new( 

815 rollback, 

816 self.fake_session, 

817 { 

818 "username": self.test_name, 

819 "password": self.password, 

820 "project_role_mappings": prms_in, 

821 }, 

822 ) 

823 self.assertEqual(len(rollback), 1, "Wrong rollback length") 

824 self.assertEqual(uid2, uid1, "Wrong project identifier") 

825 content = self.auth.create_user.call_args[0][0] 

826 self.assertEqual(content["username"], self.test_name, "Wrong project name") 

827 self.assertEqual(content["password"], self.password, "Wrong password") 

828 self.assertEqual( 

829 content["project_role_mappings"], 

830 prms_out, 

831 "Wrong project-role mappings", 

832 ) 

833 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") 

834 self.assertEqual( 

835 content["_admin"]["modified"], 

836 content["_admin"]["created"], 

837 "Wrong modification time", 

838 ) 

839 with self.subTest(i=2): 

840 rollback = [] 

841 def_rid = str(uuid4()) 

842 def_role = {"_id": def_rid, "name": "project_admin"} 

843 self.auth.get_role.return_value = def_role 

844 self.auth.get_role_list.return_value = [def_role] 

845 prms_out = [{"project": pid, "role": def_rid}] 

846 uid2, oid = self.topic.new( 

847 rollback, 

848 self.fake_session, 

849 { 

850 "username": self.test_name, 

851 "password": self.password, 

852 "projects": ["some_project"], 

853 }, 

854 ) 

855 self.assertEqual(len(rollback), 1, "Wrong rollback length") 

856 self.assertEqual(uid2, uid1, "Wrong project identifier") 

857 content = self.auth.create_user.call_args[0][0] 

858 self.assertEqual(content["username"], self.test_name, "Wrong project name") 

859 self.assertEqual(content["password"], self.password, "Wrong password") 

860 self.assertEqual( 

861 content["project_role_mappings"], 

862 prms_out, 

863 "Wrong project-role mappings", 

864 ) 

865 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") 

866 self.assertEqual( 

867 content["_admin"]["modified"], 

868 content["_admin"]["created"], 

869 "Wrong modification time", 

870 ) 

871 with self.subTest(i=3): 

872 rollback = [] 

873 with self.assertRaises( 

874 EngineException, msg="Accepted wrong project-role mappings" 

875 ) as e: 

876 self.topic.new( 

877 rollback, 

878 self.fake_session, 

879 { 

880 "username": "other-project-name", 

881 "password": "Other@pwd1", 

882 "project_role_mappings": [{}], 

883 }, 

884 ) 

885 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

886 self.assertEqual( 

887 e.exception.http_code, 

888 HTTPStatus.UNPROCESSABLE_ENTITY, 

889 "Wrong HTTP status code", 

890 ) 

891 self.assertIn( 

892 "format error at '{}' '{}'".format( 

893 "project_role_mappings:{}", "'{}' is a required property" 

894 ).format(0, "project"), 

895 norm(str(e.exception)), 

896 "Wrong exception text", 

897 ) 

898 with self.subTest(i=4): 

899 rollback = [] 

900 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e: 

901 self.topic.new( 

902 rollback, 

903 self.fake_session, 

904 { 

905 "username": "other-project-name", 

906 "password": "Other@pwd1", 

907 "projects": [], 

908 }, 

909 ) 

910 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

911 self.assertEqual( 

912 e.exception.http_code, 

913 HTTPStatus.UNPROCESSABLE_ENTITY, 

914 "Wrong HTTP status code", 

915 ) 

916 self.assertIn( 

917 "format error at '{}' '{}'".format( 

918 "projects", "{} should be non-empty" 

919 ).format([]), 

920 norm(str(e.exception)), 

921 "Wrong exception text", 

922 ) 

923 

924 def test_edit_user(self): 

925 now = time() 

926 uid = str(uuid4()) 

927 pid1 = str(uuid4()) 

928 rid1 = str(uuid4()) 

929 test_project = { 

930 "_id": test_pid, 

931 "name": "test", 

932 "_admin": {"created": now, "modified": now}, 

933 } 

934 self.db.create("projects", test_project) 

935 self.fake_session["user_id"] = uid 

936 self.fake_session["admin_show"] = True 

937 prms = [ 

938 { 

939 "project": pid1, 

940 "project_name": "project-1", 

941 "role": rid1, 

942 "role_name": "role-1", 

943 } 

944 ] 

945 user = { 

946 "_id": uid, 

947 "username": self.test_name, 

948 "project_role_mappings": prms, 

949 "_admin": {"created": now, "modified": now}, 

950 } 

951 with self.subTest(i=1): 

952 self.auth.get_user_list.side_effect = [[user], []] 

953 self.auth.get_user.return_value = user 

954 pid2 = str(uuid4()) 

955 rid2 = str(uuid4()) 

956 self.auth.get_project.side_effect = [ 

957 {"_id": pid2, "name": "project-2"}, 

958 {"_id": pid1, "name": "project-1"}, 

959 ] 

960 self.auth.get_role.side_effect = [ 

961 {"_id": rid2, "name": "role-2"}, 

962 {"_id": rid1, "name": "role-1"}, 

963 ] 

964 

965 role = { 

966 "_id": rid1, 

967 "name": "role-1", 

968 "permissions": {"default": False, "admin": False, "roles": True}, 

969 } 

970 self.db.create("users", user) 

971 self.db.create("roles", role) 

972 new_name = "new-user-name" 

973 new_pasw = "New@pwd1" 

974 add_prms = [{"project": pid2, "role": rid2}] 

975 rem_prms = [{"project": pid1, "role": rid1}] 

976 self.topic.edit( 

977 self.fake_session, 

978 uid, 

979 { 

980 "username": new_name, 

981 "password": new_pasw, 

982 "add_project_role_mappings": add_prms, 

983 "remove_project_role_mappings": rem_prms, 

984 }, 

985 ) 

986 content = self.auth.update_user.call_args[0][0] 

987 self.assertEqual(content["_id"], uid, "Wrong user identifier") 

988 self.assertEqual(content["username"], new_name, "Wrong user name") 

989 self.assertEqual(content["password"], new_pasw, "Wrong user password") 

990 self.assertEqual( 

991 content["add_project_role_mappings"], 

992 add_prms, 

993 "Wrong project-role mappings to add", 

994 ) 

995 self.assertEqual( 

996 content["remove_project_role_mappings"], 

997 prms, 

998 "Wrong project-role mappings to remove", 

999 ) 

1000 with self.subTest(i=2): 

1001 new_name = "other-user-name" 

1002 new_prms = [{}] 

1003 self.auth.get_role_list.side_effect = [[user], []] 

1004 self.auth.get_user_list.side_effect = [[user]] 

1005 with self.assertRaises( 

1006 EngineException, msg="Accepted wrong project-role mappings" 

1007 ) as e: 

1008 self.topic.edit( 

1009 self.fake_session, 

1010 uid, 

1011 {"username": new_name, "project_role_mappings": new_prms}, 

1012 ) 

1013 self.assertEqual( 

1014 e.exception.http_code, 

1015 HTTPStatus.UNPROCESSABLE_ENTITY, 

1016 "Wrong HTTP status code", 

1017 ) 

1018 self.assertIn( 

1019 "format error at '{}' '{}'".format( 

1020 "project_role_mappings:{}", "'{}' is a required property" 

1021 ).format(0, "project"), 

1022 norm(str(e.exception)), 

1023 "Wrong exception text", 

1024 ) 

1025 with self.subTest(i=3): 

1026 self.auth.get_user_list.side_effect = [[user], []] 

1027 self.auth.get_user.return_value = user 

1028 old_password = self.password 

1029 new_pasw = "New@pwd1" 

1030 self.topic.edit( 

1031 self.fake_session, 

1032 uid, 

1033 { 

1034 "old_password": old_password, 

1035 "password": new_pasw, 

1036 }, 

1037 ) 

1038 content = self.auth.update_user.call_args[0][0] 

1039 self.assertEqual( 

1040 content["old_password"], old_password, "Wrong old password" 

1041 ) 

1042 self.assertEqual(content["password"], new_pasw, "Wrong user password") 

1043 

1044 def test_delete_user(self): 

1045 with self.subTest(i=1): 

1046 uid = str(uuid4()) 

1047 self.fake_session["username"] = self.test_name 

1048 user = user = { 

1049 "_id": uid, 

1050 "username": "other-user-name", 

1051 "project_role_mappings": [], 

1052 } 

1053 self.auth.get_user.return_value = user 

1054 self.auth.delete_user.return_value = {"deleted": 1} 

1055 rc = self.topic.delete(self.fake_session, uid) 

1056 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info") 

1057 self.assertEqual( 

1058 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier" 

1059 ) 

1060 self.assertEqual( 

1061 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier" 

1062 ) 

1063 

1064 def test_conflict_on_new(self): 

1065 with self.subTest(i=1): 

1066 rollback = [] 

1067 uid = str(uuid4()) 

1068 with self.assertRaises( 

1069 EngineException, msg="Accepted uuid as username" 

1070 ) as e: 

1071 self.topic.new( 

1072 rollback, 

1073 self.fake_session, 

1074 { 

1075 "username": uid, 

1076 "password": self.password, 

1077 "projects": [test_pid], 

1078 }, 

1079 ) 

1080 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

1081 self.assertEqual( 

1082 e.exception.http_code, 

1083 HTTPStatus.UNPROCESSABLE_ENTITY, 

1084 "Wrong HTTP status code", 

1085 ) 

1086 self.assertIn( 

1087 "username '{}' cannot have a uuid format".format(uid), 

1088 norm(str(e.exception)), 

1089 "Wrong exception text", 

1090 ) 

1091 with self.subTest(i=2): 

1092 rollback = [] 

1093 self.auth.get_user_list.return_value = [ 

1094 {"_id": str(uuid4()), "username": self.test_name} 

1095 ] 

1096 with self.assertRaises( 

1097 EngineException, msg="Accepted existing username" 

1098 ) as e: 

1099 self.topic.new( 

1100 rollback, 

1101 self.fake_session, 

1102 { 

1103 "username": self.test_name, 

1104 "password": self.password, 

1105 "projects": [test_pid], 

1106 }, 

1107 ) 

1108 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

1109 self.assertEqual( 

1110 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

1111 ) 

1112 self.assertIn( 

1113 "username '{}' is already used".format(self.test_name), 

1114 norm(str(e.exception)), 

1115 "Wrong exception text", 

1116 ) 

1117 with self.subTest(i=3): 

1118 rollback = [] 

1119 self.auth.get_user_list.return_value = [] 

1120 self.auth.get_role_list.side_effect = [[], []] 

1121 with self.assertRaises( 

1122 AuthconnNotFoundException, msg="Accepted user without default role" 

1123 ) as e: 

1124 self.topic.new( 

1125 rollback, 

1126 self.fake_session, 

1127 { 

1128 "username": self.test_name, 

1129 "password": self.password, 

1130 "projects": [str(uuid4())], 

1131 }, 

1132 ) 

1133 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

1134 self.assertEqual( 

1135 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code" 

1136 ) 

1137 self.assertIn( 

1138 "can't find default role for user '{}'".format(self.test_name), 

1139 norm(str(e.exception)), 

1140 "Wrong exception text", 

1141 ) 

1142 

1143 def test_conflict_on_edit(self): 

1144 uid = str(uuid4()) 

1145 with self.subTest(i=1): 

1146 self.auth.get_user_list.return_value = [ 

1147 {"_id": uid, "username": self.test_name} 

1148 ] 

1149 new_name = str(uuid4()) 

1150 with self.assertRaises( 

1151 EngineException, msg="Accepted uuid as username" 

1152 ) as e: 

1153 self.topic.edit(self.fake_session, uid, {"username": new_name}) 

1154 self.assertEqual( 

1155 e.exception.http_code, 

1156 HTTPStatus.UNPROCESSABLE_ENTITY, 

1157 "Wrong HTTP status code", 

1158 ) 

1159 self.assertIn( 

1160 "username '{}' cannot have an uuid format".format(new_name), 

1161 norm(str(e.exception)), 

1162 "Wrong exception text", 

1163 ) 

1164 with self.subTest(i=2): 

1165 self.auth.get_user_list.return_value = [ 

1166 {"_id": uid, "username": self.test_name} 

1167 ] 

1168 self.auth.get_role_list.side_effect = [[], []] 

1169 with self.assertRaises( 

1170 AuthconnNotFoundException, msg="Accepted user without default role" 

1171 ) as e: 

1172 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]}) 

1173 self.assertEqual( 

1174 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code" 

1175 ) 

1176 self.assertIn( 

1177 "can't find a default role for user '{}'".format(self.test_name), 

1178 norm(str(e.exception)), 

1179 "Wrong exception text", 

1180 ) 

1181 with self.subTest(i=3): 

1182 admin_uid = str(uuid4()) 

1183 self.auth.get_user_list.return_value = [ 

1184 {"_id": admin_uid, "username": "admin"} 

1185 ] 

1186 with self.assertRaises( 

1187 EngineException, 

1188 msg="Accepted removing system_admin role from admin user", 

1189 ) as e: 

1190 self.topic.edit( 

1191 self.fake_session, 

1192 admin_uid, 

1193 { 

1194 "remove_project_role_mappings": [ 

1195 {"project": "admin", "role": "system_admin"} 

1196 ] 

1197 }, 

1198 ) 

1199 self.assertEqual( 

1200 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code" 

1201 ) 

1202 self.assertIn( 

1203 "you cannot remove system_admin role from admin user", 

1204 norm(str(e.exception)), 

1205 "Wrong exception text", 

1206 ) 

1207 with self.subTest(i=4): 

1208 new_name = "new-user-name" 

1209 self.auth.get_user_list.side_effect = [ 

1210 [{"_id": uid, "name": self.test_name}], 

1211 [{"_id": str(uuid4()), "name": new_name}], 

1212 ] 

1213 with self.assertRaises( 

1214 EngineException, msg="Accepted existing username" 

1215 ) as e: 

1216 self.topic.edit(self.fake_session, uid, {"username": new_name}) 

1217 self.assertEqual( 

1218 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

1219 ) 

1220 self.assertIn( 

1221 "username '{}' is already used".format(new_name), 

1222 norm(str(e.exception)), 

1223 "Wrong exception text", 

1224 ) 

1225 

1226 def test_conflict_on_del(self): 

1227 with self.subTest(i=1): 

1228 uid = str(uuid4()) 

1229 self.fake_session["username"] = self.test_name 

1230 user = user = { 

1231 "_id": uid, 

1232 "username": self.test_name, 

1233 "project_role_mappings": [], 

1234 } 

1235 self.auth.get_user.return_value = user 

1236 with self.assertRaises( 

1237 EngineException, msg="Accepted deletion of own user" 

1238 ) as e: 

1239 self.topic.delete(self.fake_session, uid) 

1240 self.assertEqual( 

1241 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

1242 ) 

1243 self.assertIn( 

1244 "you cannot delete your own login user", 

1245 norm(str(e.exception)), 

1246 "Wrong exception text", 

1247 ) 

1248 

1249 def test_user_management(self): 

1250 self.config = { 

1251 "user_management": True, 

1252 "pwd_expire_days": 30, 

1253 "max_pwd_attempt": 5, 

1254 "account_expire_days": 90, 

1255 "version": "dev", 

1256 "deviceVendor": "test", 

1257 "deviceProduct": "test", 

1258 } 

1259 self.permissions = {"admin": True, "default": True} 

1260 now = time() 

1261 rid = str(uuid4()) 

1262 role = { 

1263 "_id": rid, 

1264 "name": self.test_name, 

1265 "permissions": self.permissions, 

1266 "_admin": {"created": now, "modified": now}, 

1267 } 

1268 self.db.create("roles", role) 

1269 admin_user = { 

1270 "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500", 

1271 "username": "admin", 

1272 "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb", 

1273 "_admin": { 

1274 "created": 1663058370.7721832, 

1275 "modified": 1663681183.5651639, 

1276 "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b", 

1277 "last_token_time": 1666876472.2962265, 

1278 "user_status": "always-active", 

1279 "retry_count": 0, 

1280 }, 

1281 "project_role_mappings": [ 

1282 {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid} 

1283 ], 

1284 } 

1285 self.db.create("users", admin_user) 

1286 with self.subTest(i=1): 

1287 self.user_create = AuthconnInternal(self.config, self.db, self.permissions) 

1288 user_info = {"username": "user_mgmt_true", "password": "Test@123"} 

1289 self.user_create.create_user(user_info) 

1290 user = self.db.get_one("users", {"username": user_info["username"]}) 

1291 self.assertEqual(user["username"], user_info["username"], "Wrong user name") 

1292 self.assertEqual( 

1293 user["_admin"]["user_status"], "active", "User status is unknown" 

1294 ) 

1295 self.assertIn("password_expire_time", user["_admin"], "Key is not there") 

1296 self.assertIn("account_expire_time", user["_admin"], "Key is not there") 

1297 with self.subTest(i=2): 

1298 self.user_update = AuthconnInternal(self.config, self.db, self.permissions) 

1299 locked_user = { 

1300 "username": "user_lock", 

1301 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d", 

1302 "_admin": { 

1303 "created": 1667207552.2191198, 

1304 "modified": 1667207552.2191815, 

1305 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102", 

1306 "user_status": "locked", 

1307 "password_expire_time": 1667207552.2191815, 

1308 "account_expire_time": now + 60, 

1309 "retry_count": 5, 

1310 "last_token_time": 1667207552.2191815, 

1311 }, 

1312 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6", 

1313 } 

1314 self.db.create("users", locked_user) 

1315 user_info = { 

1316 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6", 

1317 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500", 

1318 "unlock": True, 

1319 } 

1320 self.assertEqual( 

1321 locked_user["_admin"]["user_status"], "locked", "User status is unknown" 

1322 ) 

1323 self.user_update.update_user(user_info) 

1324 user = self.db.get_one("users", {"username": locked_user["username"]}) 

1325 self.assertEqual( 

1326 user["username"], locked_user["username"], "Wrong user name" 

1327 ) 

1328 self.assertEqual( 

1329 user["_admin"]["user_status"], "active", "User status is unknown" 

1330 ) 

1331 self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown") 

1332 with self.subTest(i=3): 

1333 self.user_update = AuthconnInternal(self.config, self.db, self.permissions) 

1334 expired_user = { 

1335 "username": "user_expire", 

1336 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d", 

1337 "_admin": { 

1338 "created": 1665602087.601298, 

1339 "modified": 1665636442.1245084, 

1340 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102", 

1341 "user_status": "expired", 

1342 "password_expire_time": 1668248628.2191815, 

1343 "account_expire_time": 1666952628.2191815, 

1344 "retry_count": 0, 

1345 "last_token_time": 1666779828.2171815, 

1346 }, 

1347 "_id": "3266430f-8222-407f-b08f-3a242504ab94", 

1348 } 

1349 self.db.create("users", expired_user) 

1350 user_info = { 

1351 "_id": "3266430f-8222-407f-b08f-3a242504ab94", 

1352 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500", 

1353 "renew": True, 

1354 } 

1355 self.assertEqual( 

1356 expired_user["_admin"]["user_status"], 

1357 "expired", 

1358 "User status is unknown", 

1359 ) 

1360 self.user_update.update_user(user_info) 

1361 user = self.db.get_one("users", {"username": expired_user["username"]}) 

1362 self.assertEqual( 

1363 user["username"], expired_user["username"], "Wrong user name" 

1364 ) 

1365 self.assertEqual( 

1366 user["_admin"]["user_status"], "active", "User status is unknown" 

1367 ) 

1368 self.assertGreater( 

1369 user["_admin"]["account_expire_time"], 

1370 expired_user["_admin"]["account_expire_time"], 

1371 "User expire time is not get extended", 

1372 ) 

1373 with self.subTest(i=4): 

1374 self.config.update({"user_management": False}) 

1375 self.user_create = AuthconnInternal(self.config, self.db, self.permissions) 

1376 user_info = {"username": "user_mgmt_false", "password": "Test@123"} 

1377 self.user_create.create_user(user_info) 

1378 user = self.db.get_one("users", {"username": user_info["username"]}) 

1379 self.assertEqual(user["username"], user_info["username"], "Wrong user name") 

1380 self.assertEqual( 

1381 user["_admin"]["user_status"], "active", "User status is unknown" 

1382 ) 

1383 self.assertNotIn("password_expire_time", user["_admin"], "Key is not there") 

1384 self.assertNotIn("account_expire_time", user["_admin"], "Key is not there") 

1385 

1386 

1387class Test_CommonVimWimSdn(TestCase): 

1388 @classmethod 

1389 def setUpClass(cls): 

1390 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager 

1391 

1392 def setUp(self): 

1393 self.db = Mock(dbbase.DbBase()) 

1394 self.fs = Mock(fsbase.FsBase()) 

1395 self.msg = Mock(msgbase.MsgBase()) 

1396 self.auth = Mock(authconn.Authconn(None, None, None)) 

1397 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth) 

1398 # Use WIM schemas for testing because they are the simplest 

1399 self.topic._send_msg = Mock() 

1400 self.topic.topic = "wims" 

1401 self.topic.schema_new = validation.wim_account_new_schema 

1402 self.topic.schema_edit = validation.wim_account_edit_schema 

1403 self.fake_session = { 

1404 "username": test_name, 

1405 "project_id": (test_pid,), 

1406 "method": None, 

1407 "admin": True, 

1408 "force": False, 

1409 "public": False, 

1410 "allow_show_user_project_role": True, 

1411 } 

1412 self.topic.check_quota = Mock(return_value=None) # skip quota 

1413 

1414 def test_new_cvws(self): 

1415 test_url = "http://0.0.0.0:0" 

1416 with self.subTest(i=1): 

1417 rollback = [] 

1418 test_type = "fake" 

1419 self.db.get_one.return_value = None 

1420 self.db.create.side_effect = lambda self, content: content["_id"] 

1421 cid, oid = self.topic.new( 

1422 rollback, 

1423 self.fake_session, 

1424 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}, 

1425 ) 

1426 self.assertEqual(len(rollback), 1, "Wrong rollback length") 

1427 args = self.db.create.call_args[0] 

1428 content = args[1] 

1429 self.assertEqual(args[0], self.topic.topic, "Wrong topic") 

1430 self.assertEqual(content["_id"], cid, "Wrong CIM identifier") 

1431 self.assertEqual(content["name"], self.test_name, "Wrong CIM name") 

1432 self.assertEqual(content["wim_url"], test_url, "Wrong URL") 

1433 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type") 

1434 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version") 

1435 self.assertEqual(content["op_id"], oid, "Wrong operation identifier") 

1436 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") 

1437 self.assertEqual( 

1438 content["_admin"]["modified"], 

1439 content["_admin"]["created"], 

1440 "Wrong modification time", 

1441 ) 

1442 self.assertEqual( 

1443 content["_admin"]["operationalState"], 

1444 "PROCESSING", 

1445 "Wrong operational state", 

1446 ) 

1447 self.assertEqual( 

1448 content["_admin"]["projects_read"], 

1449 [test_pid], 

1450 "Wrong read-only projects", 

1451 ) 

1452 self.assertEqual( 

1453 content["_admin"]["projects_write"], 

1454 [test_pid], 

1455 "Wrong read/write projects", 

1456 ) 

1457 self.assertIsNone( 

1458 content["_admin"]["current_operation"], "Wrong current operation" 

1459 ) 

1460 self.assertEqual( 

1461 len(content["_admin"]["operations"]), 1, "Wrong number of operations" 

1462 ) 

1463 operation = content["_admin"]["operations"][0] 

1464 self.assertEqual( 

1465 operation["lcmOperationType"], "create", "Wrong operation type" 

1466 ) 

1467 self.assertEqual( 

1468 operation["operationState"], "PROCESSING", "Wrong operation state" 

1469 ) 

1470 self.assertGreater( 

1471 operation["startTime"], 

1472 content["_admin"]["created"], 

1473 "Wrong operation start time", 

1474 ) 

1475 self.assertGreater( 

1476 operation["statusEnteredTime"], 

1477 content["_admin"]["created"], 

1478 "Wrong operation status enter time", 

1479 ) 

1480 self.assertEqual( 

1481 operation["detailed-status"], "", "Wrong operation detailed status info" 

1482 ) 

1483 self.assertIsNone( 

1484 operation["operationParams"], "Wrong operation parameters" 

1485 ) 

1486 # This test is disabled. From Feature 8030 we admit all WIM/SDN types 

1487 # with self.subTest(i=2): 

1488 # rollback = [] 

1489 # test_type = "bad_type" 

1490 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e: 

1491 # self.topic.new(rollback, self.fake_session, 

1492 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) 

1493 # self.assertEqual(len(rollback), 0, "Wrong rollback length") 

1494 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") 

1495 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""), 

1496 # norm(str(e.exception)), "Wrong exception text") 

1497 

1498 def test_conflict_on_new(self): 

1499 with self.subTest(i=1): 

1500 rollback = [] 

1501 test_url = "http://0.0.0.0:0" 

1502 test_type = "fake" 

1503 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name} 

1504 with self.assertRaises( 

1505 EngineException, msg="Accepted existing CIM name" 

1506 ) as e: 

1507 self.topic.new( 

1508 rollback, 

1509 self.fake_session, 

1510 { 

1511 "name": self.test_name, 

1512 "wim_url": test_url, 

1513 "wim_type": test_type, 

1514 }, 

1515 ) 

1516 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

1517 self.assertEqual( 

1518 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

1519 ) 

1520 self.assertIn( 

1521 "name '{}' already exists for {}".format( 

1522 self.test_name, self.topic.topic 

1523 ), 

1524 norm(str(e.exception)), 

1525 "Wrong exception text", 

1526 ) 

1527 

1528 def test_edit_cvws(self): 

1529 now = time() 

1530 cid = str(uuid4()) 

1531 test_url = "http://0.0.0.0:0" 

1532 test_type = "fake" 

1533 cvws = { 

1534 "_id": cid, 

1535 "name": self.test_name, 

1536 "wim_url": test_url, 

1537 "wim_type": test_type, 

1538 "_admin": { 

1539 "created": now, 

1540 "modified": now, 

1541 "operations": [{"lcmOperationType": "create"}], 

1542 }, 

1543 } 

1544 with self.subTest(i=1): 

1545 new_name = "new-cim-name" 

1546 new_url = "https://1.1.1.1:1" 

1547 new_type = "onos" 

1548 self.db.get_one.side_effect = [cvws, None] 

1549 self.db.replace.return_value = {"updated": 1} 

1550 # self.db.encrypt.side_effect = [b64str(), b64str()] 

1551 self.topic.edit( 

1552 self.fake_session, 

1553 cid, 

1554 {"name": new_name, "wim_url": new_url, "wim_type": new_type}, 

1555 ) 

1556 args = self.db.replace.call_args[0] 

1557 content = args[2] 

1558 self.assertEqual(args[0], self.topic.topic, "Wrong topic") 

1559 self.assertEqual(args[1], cid, "Wrong CIM identifier") 

1560 self.assertEqual(content["_id"], cid, "Wrong CIM identifier") 

1561 self.assertEqual(content["name"], new_name, "Wrong CIM name") 

1562 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type") 

1563 self.assertEqual(content["wim_url"], new_url, "Wrong URL") 

1564 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") 

1565 self.assertGreater( 

1566 content["_admin"]["modified"], 

1567 content["_admin"]["created"], 

1568 "Wrong modification time", 

1569 ) 

1570 self.assertEqual( 

1571 len(content["_admin"]["operations"]), 2, "Wrong number of operations" 

1572 ) 

1573 operation = content["_admin"]["operations"][1] 

1574 self.assertEqual( 

1575 operation["lcmOperationType"], "edit", "Wrong operation type" 

1576 ) 

1577 self.assertEqual( 

1578 operation["operationState"], "PROCESSING", "Wrong operation state" 

1579 ) 

1580 self.assertGreater( 

1581 operation["startTime"], 

1582 content["_admin"]["modified"], 

1583 "Wrong operation start time", 

1584 ) 

1585 self.assertGreater( 

1586 operation["statusEnteredTime"], 

1587 content["_admin"]["modified"], 

1588 "Wrong operation status enter time", 

1589 ) 

1590 self.assertEqual( 

1591 operation["detailed-status"], "", "Wrong operation detailed status info" 

1592 ) 

1593 self.assertIsNone( 

1594 operation["operationParams"], "Wrong operation parameters" 

1595 ) 

1596 with self.subTest(i=2): 

1597 self.db.get_one.side_effect = [cvws] 

1598 with self.assertRaises(EngineException, msg="Accepted wrong property") as e: 

1599 self.topic.edit( 

1600 self.fake_session, 

1601 str(uuid4()), 

1602 {"name": "new-name", "extra_prop": "anything"}, 

1603 ) 

1604 self.assertEqual( 

1605 e.exception.http_code, 

1606 HTTPStatus.UNPROCESSABLE_ENTITY, 

1607 "Wrong HTTP status code", 

1608 ) 

1609 self.assertIn( 

1610 "format error '{}'".format( 

1611 "additional properties are not allowed ('{}' was unexpected)" 

1612 ).format("extra_prop"), 

1613 norm(str(e.exception)), 

1614 "Wrong exception text", 

1615 ) 

1616 

1617 def test_conflict_on_edit(self): 

1618 with self.subTest(i=1): 

1619 cid = str(uuid4()) 

1620 new_name = "new-cim-name" 

1621 self.db.get_one.side_effect = [ 

1622 {"_id": cid, "name": self.test_name}, 

1623 {"_id": str(uuid4()), "name": new_name}, 

1624 ] 

1625 with self.assertRaises( 

1626 EngineException, msg="Accepted existing CIM name" 

1627 ) as e: 

1628 self.topic.edit(self.fake_session, cid, {"name": new_name}) 

1629 self.assertEqual( 

1630 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

1631 ) 

1632 self.assertIn( 

1633 "name '{}' already exists for {}".format(new_name, self.topic.topic), 

1634 norm(str(e.exception)), 

1635 "Wrong exception text", 

1636 ) 

1637 

1638 def test_delete_cvws(self): 

1639 cid = str(uuid4()) 

1640 ro_pid = str(uuid4()) 

1641 rw_pid = str(uuid4()) 

1642 cvws = {"_id": cid, "name": self.test_name} 

1643 self.db.get_list.return_value = [] 

1644 with self.subTest(i=1): 

1645 cvws["_admin"] = { 

1646 "projects_read": [test_pid, ro_pid, rw_pid], 

1647 "projects_write": [test_pid, rw_pid], 

1648 } 

1649 self.db.get_one.return_value = cvws 

1650 oid = self.topic.delete(self.fake_session, cid) 

1651 self.assertIsNone(oid, "Wrong operation identifier") 

1652 self.assertEqual( 

1653 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1654 ) 

1655 self.assertEqual( 

1656 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" 

1657 ) 

1658 self.assertEqual( 

1659 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1660 ) 

1661 self.assertEqual( 

1662 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" 

1663 ) 

1664 self.assertEqual( 

1665 self.db.set_one.call_args[1]["update_dict"], 

1666 None, 

1667 "Wrong read-only projects update", 

1668 ) 

1669 self.assertEqual( 

1670 self.db.set_one.call_args[1]["pull_list"], 

1671 { 

1672 "_admin.projects_read": (test_pid,), 

1673 "_admin.projects_write": (test_pid,), 

1674 }, 

1675 "Wrong read/write projects update", 

1676 ) 

1677 self.topic._send_msg.assert_not_called() 

1678 with self.subTest(i=2): 

1679 now = time() 

1680 cvws["_admin"] = { 

1681 "projects_read": [test_pid], 

1682 "projects_write": [test_pid], 

1683 "operations": [], 

1684 } 

1685 self.db.get_one.return_value = cvws 

1686 oid = self.topic.delete(self.fake_session, cid) 

1687 self.assertEqual(oid, cid + ":0", "Wrong operation identifier") 

1688 self.assertEqual( 

1689 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1690 ) 

1691 self.assertEqual( 

1692 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" 

1693 ) 

1694 self.assertEqual( 

1695 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1696 ) 

1697 self.assertEqual( 

1698 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier" 

1699 ) 

1700 self.assertEqual( 

1701 self.db.set_one.call_args[1]["update_dict"], 

1702 {"_admin.to_delete": True}, 

1703 "Wrong _admin.to_delete update", 

1704 ) 

1705 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"] 

1706 self.assertEqual( 

1707 operation["lcmOperationType"], "delete", "Wrong operation type" 

1708 ) 

1709 self.assertEqual( 

1710 operation["operationState"], "PROCESSING", "Wrong operation state" 

1711 ) 

1712 self.assertEqual( 

1713 operation["detailed-status"], "", "Wrong operation detailed status" 

1714 ) 

1715 self.assertIsNone( 

1716 operation["operationParams"], "Wrong operation parameters" 

1717 ) 

1718 self.assertGreater( 

1719 operation["startTime"], now, "Wrong operation start time" 

1720 ) 

1721 self.assertGreater( 

1722 operation["statusEnteredTime"], now, "Wrong operation status enter time" 

1723 ) 

1724 self.topic._send_msg.assert_called_once_with( 

1725 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None 

1726 ) 

1727 with self.subTest(i=3): 

1728 cvws["_admin"] = { 

1729 "projects_read": [], 

1730 "projects_write": [], 

1731 "operations": [], 

1732 } 

1733 self.db.get_one.return_value = cvws 

1734 self.topic._send_msg.reset_mock() 

1735 self.db.get_one.reset_mock() 

1736 self.db.del_one.reset_mock() 

1737 self.fake_session["force"] = True # to force deletion 

1738 self.fake_session["admin"] = True # to force deletion 

1739 self.fake_session["project_id"] = [] # to force deletion 

1740 oid = self.topic.delete(self.fake_session, cid) 

1741 self.assertIsNone(oid, "Wrong operation identifier") 

1742 self.assertEqual( 

1743 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1744 ) 

1745 self.assertEqual( 

1746 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" 

1747 ) 

1748 self.assertEqual( 

1749 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1750 ) 

1751 self.assertEqual( 

1752 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" 

1753 ) 

1754 self.topic._send_msg.assert_called_once_with( 

1755 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None 

1756 ) 

1757 

1758 

1759if __name__ == "__main__": 

1760 unittest.main()